From 252e21e90bbaf3b1cb9df0dd43318039a28a4d8d Mon Sep 17 00:00:00 2001 From: Martin Medek Date: Tue, 28 Nov 2023 10:41:44 +0100 Subject: [PATCH] Replaced old redis sink tests, fixed maven to properly run ITs and tests --- debezium-server-redis/pom.xml | 12 ++ .../DebeziumServerConfigBuilder.java | 14 +- .../DebeziumTestContainerWrapper.java | 2 +- .../debezium/server/redis/RedisOffsetIT.java | 100 ------------ .../server/redis/RedisOffsetTestProfile.java | 31 ---- .../server/redis/RedisSSLStreamIT.java | 55 ------- .../redis/RedisSSLStreamTestProfile.java | 41 ----- .../RedisSSLTestResourceLifecycleManager.java | 84 ---------- .../server/redis/RedisSchemaHistoryIT.java | 135 ---------------- .../redis/RedisSchemaHistoryTestProfile.java | 36 ----- .../debezium/server/redis/RedisStreamIT.java | 146 ------------------ .../redis/RedisStreamMemoryThresholdIT.java | 60 ------- ...RedisStreamMemoryThresholdTestProfile.java | 19 --- .../server/redis/RedisStreamMessageIT.java | 66 -------- .../redis/RedisStreamMessageTestProfile.java | 19 --- .../server/redis/RedisStreamTestProfile.java | 37 ----- .../RedisTestResourceLifecycleManager.java | 82 ---------- .../server/redis/{wip => }/TestConstants.java | 2 +- .../TestContainersRedisTestBase.java | 18 +-- .../{wip => }/TestContainersSslStreamIT.java | 18 +-- .../{wip => }/TestContainersStreamIT.java | 16 +- .../redis/{wip => }/TestProperties.java | 5 +- .../io/debezium/server/redis/TestUtils.java | 59 ++++--- .../debezium/server/redis/wip/TestUtils.java | 58 ------- pom.xml | 2 - 25 files changed, 83 insertions(+), 1034 deletions(-) rename debezium-server-redis/src/test/java/io/debezium/server/redis/{wip => }/DebeziumServerConfigBuilder.java (86%) rename debezium-server-redis/src/test/java/io/debezium/server/redis/{wip => }/DebeziumTestContainerWrapper.java (98%) delete mode 100644 debezium-server-redis/src/test/java/io/debezium/server/redis/RedisOffsetIT.java delete mode 100644 debezium-server-redis/src/test/java/io/debezium/server/redis/RedisOffsetTestProfile.java delete mode 100644 debezium-server-redis/src/test/java/io/debezium/server/redis/RedisSSLStreamIT.java delete mode 100644 debezium-server-redis/src/test/java/io/debezium/server/redis/RedisSSLStreamTestProfile.java delete mode 100644 debezium-server-redis/src/test/java/io/debezium/server/redis/RedisSSLTestResourceLifecycleManager.java delete mode 100644 debezium-server-redis/src/test/java/io/debezium/server/redis/RedisSchemaHistoryIT.java delete mode 100644 debezium-server-redis/src/test/java/io/debezium/server/redis/RedisSchemaHistoryTestProfile.java delete mode 100644 debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamIT.java delete mode 100644 debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamMemoryThresholdIT.java delete mode 100644 debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamMemoryThresholdTestProfile.java delete mode 100644 debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamMessageIT.java delete mode 100644 debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamMessageTestProfile.java delete mode 100644 debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamTestProfile.java delete mode 100644 debezium-server-redis/src/test/java/io/debezium/server/redis/RedisTestResourceLifecycleManager.java rename debezium-server-redis/src/test/java/io/debezium/server/redis/{wip => }/TestConstants.java (96%) rename debezium-server-redis/src/test/java/io/debezium/server/redis/{wip => }/TestContainersRedisTestBase.java (78%) rename debezium-server-redis/src/test/java/io/debezium/server/redis/{wip => }/TestContainersSslStreamIT.java (80%) rename debezium-server-redis/src/test/java/io/debezium/server/redis/{wip => }/TestContainersStreamIT.java (94%) rename debezium-server-redis/src/test/java/io/debezium/server/redis/{wip => }/TestProperties.java (70%) delete mode 100644 debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestUtils.java diff --git a/debezium-server-redis/pom.xml b/debezium-server-redis/pom.xml index 96047f5f..b114eaf7 100644 --- a/debezium-server-redis/pom.xml +++ b/debezium-server-redis/pom.xml @@ -12,7 +12,10 @@ jar + true java + debezium-server-test + ${artifactId} @@ -154,9 +157,18 @@ ${version.debezium} ${quarkus.container-image.group} + ${quarkus.container-image.name} + + + org.apache.maven.plugins + maven-surefire-plugin + + **/*IT.java + + diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/DebeziumServerConfigBuilder.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/DebeziumServerConfigBuilder.java similarity index 86% rename from debezium-server-redis/src/test/java/io/debezium/server/redis/wip/DebeziumServerConfigBuilder.java rename to debezium-server-redis/src/test/java/io/debezium/server/redis/DebeziumServerConfigBuilder.java index a1930c2d..ea82cc70 100644 --- a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/DebeziumServerConfigBuilder.java +++ b/debezium-server-redis/src/test/java/io/debezium/server/redis/DebeziumServerConfigBuilder.java @@ -3,14 +3,14 @@ * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 */ -package io.debezium.server.redis.wip; +package io.debezium.server.redis; -import static io.debezium.server.redis.wip.TestConstants.MYSQL_DATABASE; -import static io.debezium.server.redis.wip.TestConstants.MYSQL_PASSWORD; -import static io.debezium.server.redis.wip.TestConstants.MYSQL_PORT; -import static io.debezium.server.redis.wip.TestConstants.MYSQL_USER; -import static io.debezium.server.redis.wip.TestConstants.REDIS_PORT; -import static io.debezium.server.redis.wip.TestConstants.REDIS_SSL_PORT; +import static io.debezium.server.redis.TestConstants.MYSQL_DATABASE; +import static io.debezium.server.redis.TestConstants.MYSQL_PASSWORD; +import static io.debezium.server.redis.TestConstants.MYSQL_PORT; +import static io.debezium.server.redis.TestConstants.MYSQL_USER; +import static io.debezium.server.redis.TestConstants.REDIS_PORT; +import static io.debezium.server.redis.TestConstants.REDIS_SSL_PORT; import java.util.HashMap; import java.util.List; diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/DebeziumTestContainerWrapper.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/DebeziumTestContainerWrapper.java similarity index 98% rename from debezium-server-redis/src/test/java/io/debezium/server/redis/wip/DebeziumTestContainerWrapper.java rename to debezium-server-redis/src/test/java/io/debezium/server/redis/DebeziumTestContainerWrapper.java index 5b767047..deed72bb 100644 --- a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/DebeziumTestContainerWrapper.java +++ b/debezium-server-redis/src/test/java/io/debezium/server/redis/DebeziumTestContainerWrapper.java @@ -3,7 +3,7 @@ * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 */ -package io.debezium.server.redis.wip; +package io.debezium.server.redis; import static org.awaitility.Awaitility.await; import static org.testcontainers.containers.output.OutputFrame.OutputType.STDOUT; diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisOffsetIT.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisOffsetIT.java deleted file mode 100644 index 000f4224..00000000 --- a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisOffsetIT.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.server.redis; - -import static org.assertj.core.api.Assertions.assertThat; - -import java.util.Map; - -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; - -import io.debezium.connector.postgresql.connection.PostgresConnection; -import io.debezium.doc.FixFor; -import io.debezium.util.Testing; -import io.quarkus.test.common.QuarkusTestResource; -import io.quarkus.test.junit.QuarkusIntegrationTest; -import io.quarkus.test.junit.TestProfile; - -import redis.clients.jedis.HostAndPort; -import redis.clients.jedis.Jedis; - -/** - * Integration test that verifies reading and writing offsets from Redis key value store - * - * @author Oren Elias - */ -@Disabled -@QuarkusIntegrationTest -@TestProfile(RedisOffsetTestProfile.class) -@QuarkusTestResource(RedisTestResourceLifecycleManager.class) -public class RedisOffsetIT { - - private static final int MESSAGE_COUNT = 4; - private static final String STREAM_NAME = "testc.inventory.customers"; - private static final String OFFSETS_HASH_NAME = "metadata:debezium:offsets"; - - protected static Jedis jedis; - - @Test - @FixFor("DBZ-4509") - public void testRedisStream() throws Exception { - jedis = new Jedis(HostAndPort.from(RedisTestResourceLifecycleManager.getRedisContainerAddress())); - TestUtils.awaitStreamLengthGte(jedis, STREAM_NAME, MESSAGE_COUNT); - - Map redisOffsets = jedis.hgetAll(OFFSETS_HASH_NAME); - assertThat(redisOffsets.size() > 0).isTrue(); - } - - /** - * Test retry mechanism when encountering Redis connectivity issues: - * 1. Make Redis to be unavailable while the server is up - * 2. Create a new table named redis_test in PostgreSQL and insert 5 records to it - * 3. Bring Redis up again and make sure the offsets have been written successfully - */ - @Test - @FixFor("DBZ-4509") - public void testRedisConnectionRetry() throws Exception { - Testing.Print.enable(); - - Jedis jedis = new Jedis(HostAndPort.from(RedisTestResourceLifecycleManager.getRedisContainerAddress())); - // wait until the offsets are written for the first time - TestUtils.awaitHashSizeGte(jedis, OFFSETS_HASH_NAME, 1); - - // clear the offsets key - jedis.del(OFFSETS_HASH_NAME); - - // pause container - Testing.print("Pausing container"); - RedisTestResourceLifecycleManager.pause(); - - final PostgresConnection connection = TestUtils.getPostgresConnection(); - Testing.print("Creating new redis_test table and inserting 5 records to it"); - connection.execute( - "CREATE TABLE inventory.redis_test (id INT PRIMARY KEY)", - "INSERT INTO inventory.redis_test VALUES (1)", - "INSERT INTO inventory.redis_test VALUES (2)", - "INSERT INTO inventory.redis_test VALUES (3)", - "INSERT INTO inventory.redis_test VALUES (4)", - "INSERT INTO inventory.redis_test VALUES (5)"); - connection.close(); - - Testing.print("Sleeping for 2 seconds to flush records"); - Thread.sleep(2000); - Testing.print("Unpausing container"); - - RedisTestResourceLifecycleManager.unpause(); - Testing.print("Sleeping for 2 seconds to reconnect to redis and write offset"); - - // wait until the offsets are re-written - TestUtils.awaitHashSizeGte(jedis, OFFSETS_HASH_NAME, 1); - - Map redisOffsets = jedis.hgetAll(OFFSETS_HASH_NAME); - jedis.close(); - assertThat(redisOffsets.size() > 0).isTrue(); - } - -} diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisOffsetTestProfile.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisOffsetTestProfile.java deleted file mode 100644 index 30d581a7..00000000 --- a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisOffsetTestProfile.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.server.redis; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager; -import io.quarkus.test.junit.QuarkusTestProfile; - -public class RedisOffsetTestProfile implements QuarkusTestProfile { - - @Override - public List testResources() { - return Arrays.asList(new TestResourceEntry(PostgresTestResourceLifecycleManager.class)); - } - - @Override - public Map getConfigOverrides() { - Map config = new HashMap(); - config.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector"); - config.put("debezium.source.offset.storage", "io.debezium.server.redis.RedisOffsetBackingStore"); - return config; - } - -} diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisSSLStreamIT.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisSSLStreamIT.java deleted file mode 100644 index 05aa062b..00000000 --- a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisSSLStreamIT.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.server.redis; - -import static org.junit.Assert.assertTrue; - -import java.util.Map; - -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; - -import io.quarkus.test.junit.QuarkusIntegrationTest; -import io.quarkus.test.junit.TestProfile; - -import redis.clients.jedis.HostAndPort; -import redis.clients.jedis.Jedis; - -/** - * Integration tests for secured Redis - * - * @author Oren Elias - */ -@Disabled -@QuarkusIntegrationTest -@TestProfile(RedisSSLStreamTestProfile.class) -public class RedisSSLStreamIT { - - /** - * Verifies that all the records of a PostgreSQL table are streamed to Redis - */ - @Test - public void testRedisStream() throws Exception { - HostAndPort address = HostAndPort.from(RedisSSLTestResourceLifecycleManager.getRedisContainerAddress()); - Jedis jedis = new Jedis(address.getHost(), address.getPort(), true); - final int MESSAGE_COUNT = 4; - final String STREAM_NAME = "testc.inventory.customers"; - final String HASH_NAME = "metadata:debezium:offsets"; - - TestUtils.awaitStreamLengthGte(jedis, STREAM_NAME, MESSAGE_COUNT); - - Long streamLength = jedis.xlen(STREAM_NAME); - assertTrue("Redis Basic Stream Test Failed", streamLength == MESSAGE_COUNT); - - // wait until the offsets are re-written - TestUtils.awaitHashSizeGte(jedis, HASH_NAME, 1); - - Map redisOffsets = jedis.hgetAll(HASH_NAME); - assertTrue(redisOffsets.size() > 0); - - jedis.close(); - } -} diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisSSLStreamTestProfile.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisSSLStreamTestProfile.java deleted file mode 100644 index 05f0a2db..00000000 --- a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisSSLStreamTestProfile.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.server.redis; - -import java.net.URL; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager; -import io.quarkus.test.junit.QuarkusTestProfile; - -public class RedisSSLStreamTestProfile implements QuarkusTestProfile { - - @Override - public List testResources() { - return Arrays.asList( - new TestResourceEntry(PostgresTestResourceLifecycleManager.class), - new TestResourceEntry(RedisSSLTestResourceLifecycleManager.class)); - } - - public Map getConfigOverrides() { - - Map config = new HashMap(); - URL keyStoreFile = RedisSSLStreamTestProfile.class.getClassLoader().getResource("ssl/client-keystore.p12"); - URL trustStoreFile = RedisSSLStreamTestProfile.class.getClassLoader().getResource("ssl/client-truststore.p12"); - - config.put("javax.net.ssl.keyStore", keyStoreFile.getPath()); - config.put("javax.net.ssl.trustStore", trustStoreFile.getPath()); - config.put("javax.net.ssl.keyStorePassword", "secret"); - config.put("javax.net.ssl.trustStorePassword", "secret"); - config.put("debezium.source.offset.storage", "io.debezium.server.redis.RedisOffsetBackingStore"); - config.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector"); - return config; - } - -} diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisSSLTestResourceLifecycleManager.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisSSLTestResourceLifecycleManager.java deleted file mode 100644 index fa514e56..00000000 --- a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisSSLTestResourceLifecycleManager.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.server.redis; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.testcontainers.containers.BindMode; -import org.testcontainers.containers.GenericContainer; - -import io.debezium.server.TestConfigSource; -import io.debezium.util.Testing; -import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; - -public class RedisSSLTestResourceLifecycleManager implements QuarkusTestResourceLifecycleManager { - - public static final int REDIS_PORT = 6379; - public static final String REDIS_IMAGE = "redis"; - - private static final AtomicBoolean running = new AtomicBoolean(false); - private static final GenericContainer container = new GenericContainer<>(REDIS_IMAGE) - .withClasspathResourceMapping("ssl", "/etc/certificates", BindMode.READ_ONLY) - .withCommand( - "redis-server --tls-port 6379 --port 0 --tls-cert-file /etc/certificates/redis.crt --tls-key-file /etc/certificates/redis.key --tls-ca-cert-file /etc/certificates/ca.crt") - .withExposedPorts(REDIS_PORT); - - private static synchronized void start(boolean ignored) { - if (!running.get()) { - container.start(); - TestUtils.waitBoolean(() -> container.getLogs().contains(RedisTestResourceLifecycleManager.READY_MESSAGE)); - running.set(true); - } - } - - @Override - public Map start() { - start(true); - Testing.Files.delete(TestConfigSource.OFFSET_STORE_PATH); - Testing.Files.createTestingFile(TestConfigSource.OFFSET_STORE_PATH); - - Map params = new ConcurrentHashMap<>(); - params.put("debezium.sink.type", "redis"); - params.put("debezium.source.offset.storage.redis.address", RedisSSLTestResourceLifecycleManager.getRedisContainerAddress()); - params.put("debezium.source.offset.storage.redis.ssl.enabled", "true"); - params.put("debezium.sink.redis.address", RedisSSLTestResourceLifecycleManager.getRedisContainerAddress()); - params.put("debezium.sink.redis.ssl.enabled", "true"); - params.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector"); - params.put("debezium.source.offset.flush.interval.ms", "0"); - params.put("debezium.source.topic.prefix", "testc"); - params.put("debezium.source.schema.include.list", "inventory"); - params.put("debezium.source.table.include.list", "inventory.customers,inventory.redis_test,inventory.redis_test2"); - - return params; - } - - @Override - public void stop() { - try { - container.stop(); - } - catch (Exception e) { - // ignored - } - running.set(false); - } - - public static void pause() { - container.getDockerClient().pauseContainerCmd(container.getContainerId()).exec(); - } - - public static void unpause() { - container.getDockerClient().unpauseContainerCmd(container.getContainerId()).exec(); - } - - public static String getRedisContainerAddress() { - start(true); - - return String.format("%s:%d", container.getContainerIpAddress(), container.getFirstMappedPort()); - } -} diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisSchemaHistoryIT.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisSchemaHistoryIT.java deleted file mode 100644 index 689a2e15..00000000 --- a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisSchemaHistoryIT.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.server.redis; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import java.util.List; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; -import org.junitpioneer.jupiter.RetryingTest; - -import io.debezium.config.Configuration; -import io.debezium.connector.mysql.MySqlConnectorConfig; -import io.debezium.connector.mysql.strategy.AbstractConnectorConnection; -import io.debezium.doc.FixFor; -import io.debezium.relational.history.AbstractSchemaHistoryTest; -import io.debezium.relational.history.SchemaHistory; -import io.debezium.relational.history.SchemaHistoryMetrics; -import io.debezium.testing.testcontainers.MySqlTestResourceLifecycleManager; -import io.debezium.util.Testing; -import io.quarkus.test.common.QuarkusTestResource; -import io.quarkus.test.junit.QuarkusIntegrationTest; -import io.quarkus.test.junit.TestProfile; - -import redis.clients.jedis.HostAndPort; -import redis.clients.jedis.Jedis; -import redis.clients.jedis.StreamEntryID; -import redis.clients.jedis.resps.StreamEntry; - -/** - * Integration test that verifies reading and writing database schema history from Redis key value store - * - * @author Oren Elias - */ -@Disabled -@QuarkusIntegrationTest -@TestProfile(RedisSchemaHistoryTestProfile.class) -@QuarkusTestResource(RedisTestResourceLifecycleManager.class) -public class RedisSchemaHistoryIT extends AbstractSchemaHistoryTest { - - private static final String STREAM_NAME = "metadata:debezium:schema_history"; - private static final int INIT_HISTORY_SIZE = 16; // Initial number of entries in the schema history stream. - - protected static Jedis jedis; - - @Override - @BeforeEach - public void beforeEach() { - super.beforeEach(); - } - - @Override - protected SchemaHistory createHistory() { - SchemaHistory history = new RedisSchemaHistory(); - - history.configure(Configuration.create() - .with("schema.history.internal.redis.address", HostAndPort.from(RedisTestResourceLifecycleManager.getRedisContainerAddress())) - .build(), null, SchemaHistoryMetrics.NOOP, true); - history.start(); - return history; - } - - @Test - @FixFor("DBZ-4771") - public void testSchemaHistoryIsSaved() { - jedis = new Jedis(HostAndPort.from(RedisTestResourceLifecycleManager.getRedisContainerAddress())); - TestUtils.awaitStreamLengthGte(jedis, STREAM_NAME, INIT_HISTORY_SIZE + 1); - - final List entries = jedis.xrange(STREAM_NAME, (StreamEntryID) null, (StreamEntryID) null); - assertEquals(INIT_HISTORY_SIZE + 1, entries.size()); - assertTrue(entries.stream().anyMatch(item -> item.getFields().get("schema").contains("CREATE TABLE `customers`"))); - } - - @Test - @FixFor("DBZ-4771") - public void shouldRecordChangesAndRecoverToVariousPoints() { - super.shouldRecordChangesAndRecoverToVariousPoints(); - } - - /** - * Test retry mechanism when encountering Redis connectivity issues: - * 1. Make Redis unavailable while the server is up - * 2. Create a new table named redis_test in MySQL - * 3. Bring Redis up again and make sure the database schema has been written successfully - */ - @RetryingTest(3) - @FixFor("DBZ-4509") - public void testRedisConnectionRetry() throws Exception { - Testing.Print.enable(); - - Jedis jedis = new Jedis(HostAndPort.from(RedisTestResourceLifecycleManager.getRedisContainerAddress())); - // wait until the db schema history is written for the first time - TestUtils.awaitStreamLengthGte(jedis, STREAM_NAME, 1); - - // pause container - Testing.print("Pausing container"); - RedisTestResourceLifecycleManager.pause(); - - final AbstractConnectorConnection connection = getMySqlConnection(); - connection.connect(); - Testing.print("Creating new redis_test table and inserting 5 records to it"); - connection.execute("CREATE TABLE inventory.redis_test (id INT PRIMARY KEY)"); - connection.close(); - - Testing.print("Sleeping for 2 seconds to flush records"); - Thread.sleep(2000); - Testing.print("Unpausing container"); - RedisTestResourceLifecycleManager.unpause(); - - // wait until the db schema history is written for the first time - TestUtils.awaitStreamLengthGte(jedis, STREAM_NAME, INIT_HISTORY_SIZE + 1); - - final List entries = jedis.xrange(STREAM_NAME, (StreamEntryID) null, (StreamEntryID) null); - - assertEquals(INIT_HISTORY_SIZE + 1, entries.size()); - assertTrue(entries.get(INIT_HISTORY_SIZE).getFields().get("schema").contains("redis_test")); - } - - private AbstractConnectorConnection getMySqlConnection() { - final Configuration config = Configuration.create() - .with("database.user", MySqlTestResourceLifecycleManager.PRIVILEGED_USER) - .with("database.password", MySqlTestResourceLifecycleManager.PRIVILEGED_PASSWORD) - .with("database.dbname", MySqlTestResourceLifecycleManager.DBNAME) - .with("database.hostname", MySqlTestResourceLifecycleManager.HOST) - .with("database.port", MySqlTestResourceLifecycleManager.getContainer().getMappedPort(MySqlTestResourceLifecycleManager.PORT)) - .build(); - return new MySqlConnectorConfig(config).getConnectorAdapter().createConnection(config); - } -} diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisSchemaHistoryTestProfile.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisSchemaHistoryTestProfile.java deleted file mode 100644 index e11813fb..00000000 --- a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisSchemaHistoryTestProfile.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.server.redis; - -import java.nio.file.Path; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import io.debezium.testing.testcontainers.MySqlTestResourceLifecycleManager; -import io.debezium.util.Testing; -import io.quarkus.test.junit.QuarkusTestProfile; - -public class RedisSchemaHistoryTestProfile implements QuarkusTestProfile { - public static final String OFFSETS_FILE = "file-connector-offsets.txt"; - public static final Path OFFSET_STORE_PATH = Testing.Files.createTestingPath(OFFSETS_FILE).toAbsolutePath(); - public static final String OFFSET_STORAGE_FILE_FILENAME_CONFIG = "offset.storage.file.filename"; - - @Override - public List testResources() { - return Arrays.asList(new TestResourceEntry(MySqlTestResourceLifecycleManager.class)); - } - - public Map getConfigOverrides() { - Map config = new HashMap(); - config.put("debezium.source." + OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString()); - config.put("debezium.source.schema.history.internal", "io.debezium.server.redis.RedisSchemaHistory"); - config.put("debezium.source.database.server.id", "12345"); - return config; - } - -} diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamIT.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamIT.java deleted file mode 100644 index f72e1c42..00000000 --- a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamIT.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.server.redis; - -import static org.junit.Assert.assertTrue; - -import java.util.List; -import java.util.Map; - -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; - -import io.debezium.connector.postgresql.connection.PostgresConnection; -import io.debezium.doc.FixFor; -import io.debezium.util.Testing; -import io.quarkus.test.common.QuarkusTestResource; -import io.quarkus.test.junit.QuarkusIntegrationTest; -import io.quarkus.test.junit.TestProfile; - -import redis.clients.jedis.HostAndPort; -import redis.clients.jedis.Jedis; -import redis.clients.jedis.StreamEntryID; -import redis.clients.jedis.resps.StreamEntry; - -/** - * Integration tests that verify basic reading from PostgreSQL database and writing to Redis stream - * and retry mechanism in case of connectivity issues or OOM in Redis - * - * @author M Sazzadul Hoque - * @author Yossi Shirizli - */ -@Disabled -@QuarkusIntegrationTest -@TestProfile(RedisStreamTestProfile.class) -@QuarkusTestResource(RedisTestResourceLifecycleManager.class) -public class RedisStreamIT { - - /** - * Verifies that all the records of a PostgreSQL table are streamed to Redis - */ - @Test - public void testRedisStream() throws Exception { - Jedis jedis = new Jedis(HostAndPort.from(RedisTestResourceLifecycleManager.getRedisContainerAddress())); - final int MESSAGE_COUNT = 4; - final String STREAM_NAME = "testc.inventory.customers"; - - TestUtils.awaitStreamLengthGte(jedis, STREAM_NAME, MESSAGE_COUNT); - - Long streamLength = jedis.xlen(STREAM_NAME); - assertTrue("Expected stream length of " + MESSAGE_COUNT, streamLength == MESSAGE_COUNT); - - final List entries = jedis.xrange(STREAM_NAME, (StreamEntryID) null, (StreamEntryID) null); - for (StreamEntry entry : entries) { - Map map = entry.getFields(); - assertTrue("Expected map of size 1", map.size() == 1); - Map.Entry mapEntry = map.entrySet().iterator().next(); - assertTrue("Expected json like key starting with {\"schema\":...", mapEntry.getKey().startsWith("{\"schema\":")); - assertTrue("Expected json like value starting with {\"schema\":...", mapEntry.getValue().startsWith("{\"schema\":")); - } - - jedis.close(); - } - - /** - * Test retry mechanism when encountering Redis connectivity issues: - * 1. Make Redis to be unavailable while the server is up - * 2. Create a new table named redis_test in PostgreSQL and insert 5 records to it - * 3. Bring Redis up again and make sure these records have been streamed successfully - */ - @Test - @FixFor("DBZ-4510") - public void testRedisConnectionRetry() throws Exception { - Testing.Print.enable(); - - final int MESSAGE_COUNT = 5; - final String STREAM_NAME = "testc.inventory.redis_test"; - Jedis jedis = new Jedis(HostAndPort.from(RedisTestResourceLifecycleManager.getRedisContainerAddress())); - Testing.print("Pausing container"); - RedisTestResourceLifecycleManager.pause(); - - final PostgresConnection connection = TestUtils.getPostgresConnection(); - Testing.print("Creating new redis_test table and inserting 5 records to it"); - connection.execute( - "CREATE TABLE inventory.redis_test (id INT PRIMARY KEY)", - "INSERT INTO inventory.redis_test VALUES (1)", - "INSERT INTO inventory.redis_test VALUES (2)", - "INSERT INTO inventory.redis_test VALUES (3)", - "INSERT INTO inventory.redis_test VALUES (4)", - "INSERT INTO inventory.redis_test VALUES (5)"); - connection.close(); - - Testing.print("Sleeping for 3 seconds to simulate no connection errors"); - Thread.sleep(3000); - Testing.print("Unpausing container"); - RedisTestResourceLifecycleManager.unpause(); - Thread.sleep(2000); - - Long streamLength = jedis.xlen(STREAM_NAME); - Testing.print("Entries in " + STREAM_NAME + ":" + streamLength); - jedis.close(); - assertTrue("Redis Connection Test Failed", streamLength == MESSAGE_COUNT); - } - - /** - * Test retry mechanism when encountering Redis Out of Memory: - * 1. Simulate a Redis OOM by setting its max memory to 1M - * 2. Create a new table named redis_test2 in PostgreSQL and insert 50 records to it - * 3. Sleep for 1 second to simulate Redis OOM (stream does not contain 50 records) - * 4. Unlimit memory and verify that all 50 records have been streamed - */ - @Test - @FixFor("DBZ-4510") - public void testRedisOOMRetry() throws Exception { - Testing.Print.enable(); - - Jedis jedis = new Jedis(HostAndPort.from(RedisTestResourceLifecycleManager.getRedisContainerAddress())); - final String STREAM_NAME = "testc.inventory.redis_test2"; - final int TOTAL_RECORDS = 50; - - Testing.print("Setting Redis' maxmemory to 1M"); - jedis.configSet("maxmemory", "1M"); - - PostgresConnection connection = TestUtils.getPostgresConnection(); - connection.execute("CREATE TABLE inventory.redis_test2 " + - "(id VARCHAR(100) PRIMARY KEY, " + - "first_name VARCHAR(100), " + - "last_name VARCHAR(100))"); - connection.execute(String.format("INSERT INTO inventory.redis_test2 (id,first_name,last_name) " + - "SELECT LEFT(i::text, 10), RANDOM()::text, RANDOM()::text FROM generate_series(1,%d) s(i)", TOTAL_RECORDS)); - connection.commit(); - - Thread.sleep(1000); - Testing.print("Entries in " + STREAM_NAME + ":" + jedis.xlen(STREAM_NAME)); - assertTrue(jedis.xlen(STREAM_NAME) < TOTAL_RECORDS); - - Thread.sleep(1000); - jedis.configSet("maxmemory", "0"); - TestUtils.awaitStreamLengthGte(jedis, STREAM_NAME, TOTAL_RECORDS); - - long streamLength = jedis.xlen(STREAM_NAME); - assertTrue("Redis OOM Test Failed", streamLength == TOTAL_RECORDS); - } -} diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamMemoryThresholdIT.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamMemoryThresholdIT.java deleted file mode 100644 index 1f7e4f9e..00000000 --- a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamMemoryThresholdIT.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.server.redis; - -import static org.junit.Assert.assertTrue; - -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; - -import io.debezium.connector.postgresql.connection.PostgresConnection; -import io.debezium.util.Testing; -import io.quarkus.test.common.QuarkusTestResource; -import io.quarkus.test.junit.QuarkusIntegrationTest; -import io.quarkus.test.junit.TestProfile; - -import redis.clients.jedis.HostAndPort; -import redis.clients.jedis.Jedis; - -@Disabled -@QuarkusIntegrationTest -@TestProfile(RedisStreamMemoryThresholdTestProfile.class) -@QuarkusTestResource(RedisTestResourceLifecycleManager.class) -public class RedisStreamMemoryThresholdIT { - - @Test - public void testRedisMemoryThreshold() throws Exception { - Testing.Print.enable(); - - Jedis jedis = new Jedis(HostAndPort.from(RedisTestResourceLifecycleManager.getRedisContainerAddress())); - final String STREAM_NAME = "testc.inventory.redis_test2"; - final int TOTAL_RECORDS = 50; - - Testing.print("Setting Redis' maxmemory to 1M"); - jedis.configSet("maxmemory", "1M"); - - PostgresConnection connection = TestUtils.getPostgresConnection(); - connection.execute("CREATE TABLE inventory.redis_test2 " + - "(id VARCHAR(100) PRIMARY KEY, " + - "first_name VARCHAR(100), " + - "last_name VARCHAR(100))"); - connection.execute(String.format("INSERT INTO inventory.redis_test2 (id,first_name,last_name) " + - "SELECT LEFT(i::text, 10), RANDOM()::text, RANDOM()::text FROM generate_series(1,%d) s(i)", TOTAL_RECORDS)); - connection.commit(); - - Thread.sleep(1000); - Testing.print("Entries in " + STREAM_NAME + ":" + jedis.xlen(STREAM_NAME)); - assertTrue(jedis.xlen(STREAM_NAME) < TOTAL_RECORDS); - - Thread.sleep(1000); - jedis.configSet("maxmemory", "0"); - TestUtils.awaitStreamLengthGte(jedis, STREAM_NAME, TOTAL_RECORDS); - - long streamLength = jedis.xlen(STREAM_NAME); - assertTrue("Redis Memory Threshold Test Failed", streamLength == TOTAL_RECORDS); - } - -} diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamMemoryThresholdTestProfile.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamMemoryThresholdTestProfile.java deleted file mode 100644 index 7126ac9c..00000000 --- a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamMemoryThresholdTestProfile.java +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.server.redis; - -import java.util.Map; - -public class RedisStreamMemoryThresholdTestProfile extends RedisStreamTestProfile { - - @Override - public Map getConfigOverrides() { - Map config = super.getConfigOverrides(); - config.put("debezium.sink.redis.memory.threshold.percentage", "75"); - return config; - } - -} diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamMessageIT.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamMessageIT.java deleted file mode 100644 index 5bc5f22f..00000000 --- a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamMessageIT.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.server.redis; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import java.util.List; -import java.util.Map; - -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; - -import io.debezium.util.Testing; -import io.quarkus.test.common.QuarkusTestResource; -import io.quarkus.test.junit.QuarkusIntegrationTest; -import io.quarkus.test.junit.TestProfile; - -import redis.clients.jedis.HostAndPort; -import redis.clients.jedis.Jedis; -import redis.clients.jedis.StreamEntryID; -import redis.clients.jedis.resps.StreamEntry; - -/** - * Integration tests that verify basic reading from PostgreSQL database and writing to Redis stream - * - * @author ggaborg - */ -@Disabled -@QuarkusIntegrationTest -@TestProfile(RedisStreamMessageTestProfile.class) -@QuarkusTestResource(RedisTestResourceLifecycleManager.class) -public class RedisStreamMessageIT { - - /** - * Verifies that all the records of a PostgreSQL table are streamed to Redis in extended message format - */ - @Test - public void testRedisStreamExtendedMessage() { - Testing.Print.enable(); - - Jedis jedis = new Jedis(HostAndPort.from(RedisTestResourceLifecycleManager.getRedisContainerAddress())); - final int MESSAGE_COUNT = 4; - final String STREAM_NAME = "testc.inventory.customers"; - - TestUtils.awaitStreamLengthGte(jedis, STREAM_NAME, MESSAGE_COUNT); - - long streamLength = jedis.xlen(STREAM_NAME); - assertEquals(MESSAGE_COUNT, streamLength, "Expected stream length of " + MESSAGE_COUNT); - - final List entries = jedis.xrange(STREAM_NAME, (StreamEntryID) null, (StreamEntryID) null); - for (StreamEntry entry : entries) { - Map map = entry.getFields(); - assertEquals(3, map.size(), "Expected map of size 3"); - assertTrue(map.get("key") != null && map.get("key").startsWith("{\"schema\":"), "Expected key's value starting with {\"schema\":..."); - assertTrue(map.get("value") != null && map.get("value").startsWith("{\"schema\":"), "Expected value's value starting with {\"schema\":..."); - assertTrue(map.containsKey("HEADERKEY")); - } - - jedis.close(); - } - -} diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamMessageTestProfile.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamMessageTestProfile.java deleted file mode 100644 index 329b3414..00000000 --- a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamMessageTestProfile.java +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.server.redis; - -import java.util.Map; - -public class RedisStreamMessageTestProfile extends RedisStreamTestProfile { - - @Override - public Map getConfigOverrides() { - Map config = super.getConfigOverrides(); - config.put("debezium.sink.redis.message.format", "extended"); - return config; - } - -} diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamTestProfile.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamTestProfile.java deleted file mode 100644 index 2349d22d..00000000 --- a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamTestProfile.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.server.redis; - -import java.nio.file.Path; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager; -import io.debezium.util.Testing; -import io.quarkus.test.junit.QuarkusTestProfile; - -public class RedisStreamTestProfile implements QuarkusTestProfile { - - public static final String OFFSETS_FILE = "file-connector-offsets.txt"; - public static final Path OFFSET_STORE_PATH = Testing.Files.createTestingPath(OFFSETS_FILE).toAbsolutePath(); - public static final String OFFSET_STORAGE_FILE_FILENAME_CONFIG = "offset.storage.file.filename"; - - @Override - public List testResources() { - return Arrays.asList(new TestResourceEntry(PostgresTestResourceLifecycleManager.class)); - } - - public Map getConfigOverrides() { - Map config = new HashMap(); - config.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector"); - config.put("debezium.source." + OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString()); - config.put("debezium.sink.redis.memory.threshold.percentage", "0"); - return config; - } - -} diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisTestResourceLifecycleManager.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisTestResourceLifecycleManager.java deleted file mode 100644 index 4c775464..00000000 --- a/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisTestResourceLifecycleManager.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.server.redis; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.testcontainers.containers.GenericContainer; - -import io.debezium.server.TestConfigSource; -import io.debezium.util.Testing; -import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; - -public class RedisTestResourceLifecycleManager implements QuarkusTestResourceLifecycleManager { - - static final String READY_MESSAGE = "Ready to accept connections"; - public static final int REDIS_PORT = 6379; - public static final String REDIS_IMAGE = "redis"; - - private static final AtomicBoolean running = new AtomicBoolean(false); - private static final GenericContainer container = new GenericContainer<>(REDIS_IMAGE) - .withExposedPorts(REDIS_PORT); - - private static synchronized void start(boolean ignored) { - if (!running.get()) { - container.start(); - TestUtils.waitBoolean(() -> container.getLogs().contains(READY_MESSAGE)); - running.set(true); - } - } - - @Override - public Map start() { - start(true); - Testing.Files.delete(TestConfigSource.OFFSET_STORE_PATH); - Testing.Files.createTestingFile(TestConfigSource.OFFSET_STORE_PATH); - - Map params = new ConcurrentHashMap<>(); - params.put("debezium.sink.type", "redis"); - params.put("debezium.sink.redis.address", RedisTestResourceLifecycleManager.getRedisContainerAddress()); - params.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector"); - params.put("debezium.source.offset.flush.interval.ms", "0"); - params.put("debezium.source.topic.prefix", "testc"); - params.put("debezium.source.schema.include.list", "inventory"); - params.put("debezium.source.table.include.list", "inventory.customers,inventory.redis_test,inventory.redis_test2"); - params.put("debezium.transforms", "addheader"); - params.put("debezium.transforms.addheader.type", "org.apache.kafka.connect.transforms.InsertHeader"); - params.put("debezium.transforms.addheader.header", "headerKey"); - params.put("debezium.transforms.addheader.value.literal", "headerValue"); - - return params; - } - - @Override - public void stop() { - try { - container.stop(); - } - catch (Exception e) { - // ignored - } - running.set(false); - } - - public static void pause() { - container.getDockerClient().pauseContainerCmd(container.getContainerId()).exec(); - } - - public static void unpause() { - container.getDockerClient().unpauseContainerCmd(container.getContainerId()).exec(); - } - - public static String getRedisContainerAddress() { - start(true); - - return String.format("%s:%d", container.getContainerIpAddress(), container.getMappedPort(REDIS_PORT)); - } -} diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestConstants.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/TestConstants.java similarity index 96% rename from debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestConstants.java rename to debezium-server-redis/src/test/java/io/debezium/server/redis/TestConstants.java index f26dbf46..ac8bd3d5 100644 --- a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestConstants.java +++ b/debezium-server-redis/src/test/java/io/debezium/server/redis/TestConstants.java @@ -3,7 +3,7 @@ * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 */ -package io.debezium.server.redis.wip; +package io.debezium.server.redis; public class TestConstants { private TestConstants() { diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestContainersRedisTestBase.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/TestContainersRedisTestBase.java similarity index 78% rename from debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestContainersRedisTestBase.java rename to debezium-server-redis/src/test/java/io/debezium/server/redis/TestContainersRedisTestBase.java index cbffdfd1..7962aa30 100644 --- a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestContainersRedisTestBase.java +++ b/debezium-server-redis/src/test/java/io/debezium/server/redis/TestContainersRedisTestBase.java @@ -3,16 +3,16 @@ * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 */ -package io.debezium.server.redis.wip; +package io.debezium.server.redis; -import static io.debezium.server.redis.wip.TestConstants.LOCALHOST; -import static io.debezium.server.redis.wip.TestConstants.MYSQL_PORT; -import static io.debezium.server.redis.wip.TestConstants.MYSQL_PRIVILEGED_PASSWORD; -import static io.debezium.server.redis.wip.TestConstants.MYSQL_PRIVILEGED_USER; -import static io.debezium.server.redis.wip.TestConstants.MYSQL_ROOT_PASSWORD; -import static io.debezium.server.redis.wip.TestConstants.REDIS_IMAGE; -import static io.debezium.server.redis.wip.TestConstants.REDIS_PORT; -import static io.debezium.server.redis.wip.TestProperties.DEBEZIUM_SERVER_IMAGE; +import static io.debezium.server.redis.TestConstants.LOCALHOST; +import static io.debezium.server.redis.TestConstants.MYSQL_PORT; +import static io.debezium.server.redis.TestConstants.MYSQL_PRIVILEGED_PASSWORD; +import static io.debezium.server.redis.TestConstants.MYSQL_PRIVILEGED_USER; +import static io.debezium.server.redis.TestConstants.MYSQL_ROOT_PASSWORD; +import static io.debezium.server.redis.TestConstants.REDIS_IMAGE; +import static io.debezium.server.redis.TestConstants.REDIS_PORT; +import static io.debezium.server.redis.TestProperties.DEBEZIUM_SERVER_IMAGE; import java.time.Duration; import java.util.List; diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestContainersSslStreamIT.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/TestContainersSslStreamIT.java similarity index 80% rename from debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestContainersSslStreamIT.java rename to debezium-server-redis/src/test/java/io/debezium/server/redis/TestContainersSslStreamIT.java index fb0134f9..ea17d623 100644 --- a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestContainersSslStreamIT.java +++ b/debezium-server-redis/src/test/java/io/debezium/server/redis/TestContainersSslStreamIT.java @@ -3,15 +3,15 @@ * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 */ -package io.debezium.server.redis.wip; +package io.debezium.server.redis; -import static io.debezium.server.redis.wip.TestConstants.INITIAL_CUSTOMER_COUNT; -import static io.debezium.server.redis.wip.TestConstants.INITIAL_SCHEMA_HISTORY_SIZE; -import static io.debezium.server.redis.wip.TestConstants.LOCALHOST; -import static io.debezium.server.redis.wip.TestConstants.REDIS_PORT; -import static io.debezium.server.redis.wip.TestConstants.REDIS_SSL_PORT; -import static io.debezium.server.redis.wip.TestUtils.insertCustomerToMySql; -import static io.debezium.server.redis.wip.TestUtils.waitForStreamLength; +import static io.debezium.server.redis.TestConstants.INITIAL_CUSTOMER_COUNT; +import static io.debezium.server.redis.TestConstants.INITIAL_SCHEMA_HISTORY_SIZE; +import static io.debezium.server.redis.TestConstants.LOCALHOST; +import static io.debezium.server.redis.TestConstants.REDIS_PORT; +import static io.debezium.server.redis.TestConstants.REDIS_SSL_PORT; +import static io.debezium.server.redis.TestUtils.insertCustomerToMySql; +import static io.debezium.server.redis.TestUtils.waitForStreamLength; import java.io.IOException; @@ -20,8 +20,6 @@ import org.slf4j.LoggerFactory; import org.testcontainers.containers.BindMode; -import io.debezium.server.redis.TestUtils; - import redis.clients.jedis.HostAndPort; import redis.clients.jedis.Jedis; diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestContainersStreamIT.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/TestContainersStreamIT.java similarity index 94% rename from debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestContainersStreamIT.java rename to debezium-server-redis/src/test/java/io/debezium/server/redis/TestContainersStreamIT.java index af39e5f0..5f2356b9 100644 --- a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestContainersStreamIT.java +++ b/debezium-server-redis/src/test/java/io/debezium/server/redis/TestContainersStreamIT.java @@ -3,14 +3,14 @@ * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 */ -package io.debezium.server.redis.wip; - -import static io.debezium.server.redis.wip.TestConstants.INITIAL_CUSTOMER_COUNT; -import static io.debezium.server.redis.wip.TestConstants.INITIAL_SCHEMA_HISTORY_SIZE; -import static io.debezium.server.redis.wip.TestUtils.awaitHashSizeGte; -import static io.debezium.server.redis.wip.TestUtils.getMySqlConnection; -import static io.debezium.server.redis.wip.TestUtils.insertCustomerToMySql; -import static io.debezium.server.redis.wip.TestUtils.waitForStreamLength; +package io.debezium.server.redis; + +import static io.debezium.server.redis.TestConstants.INITIAL_CUSTOMER_COUNT; +import static io.debezium.server.redis.TestConstants.INITIAL_SCHEMA_HISTORY_SIZE; +import static io.debezium.server.redis.TestUtils.awaitHashSizeGte; +import static io.debezium.server.redis.TestUtils.getMySqlConnection; +import static io.debezium.server.redis.TestUtils.insertCustomerToMySql; +import static io.debezium.server.redis.TestUtils.waitForStreamLength; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertTrue; diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestProperties.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/TestProperties.java similarity index 70% rename from debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestProperties.java rename to debezium-server-redis/src/test/java/io/debezium/server/redis/TestProperties.java index 341eaf26..2cab56e8 100644 --- a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestProperties.java +++ b/debezium-server-redis/src/test/java/io/debezium/server/redis/TestProperties.java @@ -3,7 +3,7 @@ * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 */ -package io.debezium.server.redis.wip; +package io.debezium.server.redis; public final class TestProperties { private TestProperties() { @@ -12,5 +12,6 @@ private TestProperties() { public static final String DEBEZIUM_VERSION = System.getProperty("test.version.debezium"); public static final String DEBEZIUM_SERVER_IMAGE_GROUP = System.getProperty("test.server.image.group"); - public static final String DEBEZIUM_SERVER_IMAGE = DEBEZIUM_SERVER_IMAGE_GROUP + "/debezium-server-redis:" + DEBEZIUM_VERSION; + private static final String DEBEZIUM_SERVER_IMAGE_NAME = System.getProperty("test.server.image.name"); + public static final String DEBEZIUM_SERVER_IMAGE = DEBEZIUM_SERVER_IMAGE_GROUP + "/" + DEBEZIUM_SERVER_IMAGE_NAME + ":" + DEBEZIUM_VERSION; } diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/TestUtils.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/TestUtils.java index aec9a6d1..d395f29f 100644 --- a/debezium-server-redis/src/test/java/io/debezium/server/redis/TestUtils.java +++ b/debezium-server-redis/src/test/java/io/debezium/server/redis/TestUtils.java @@ -5,45 +5,54 @@ */ package io.debezium.server.redis; -import java.time.Duration; -import java.util.function.Supplier; +import static io.debezium.server.redis.TestConstants.LOCALHOST; +import static io.debezium.server.redis.TestConstants.MYSQL_DATABASE; +import static io.debezium.server.redis.TestConstants.MYSQL_PORT; +import static io.debezium.server.redis.TestConstants.MYSQL_PRIVILEGED_PASSWORD; +import static io.debezium.server.redis.TestConstants.MYSQL_PRIVILEGED_USER; +import static io.debezium.server.redis.TestConstants.MYSQL_ROOT_PASSWORD; +import static org.awaitility.Awaitility.await; -import org.awaitility.Awaitility; +import java.io.IOException; +import java.util.concurrent.TimeUnit; -import io.debezium.connector.postgresql.connection.PostgresConnection; -import io.debezium.jdbc.JdbcConfiguration; -import io.debezium.server.TestConfigSource; -import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager; +import io.debezium.config.Configuration; +import io.debezium.connector.mysql.strategy.mysql.MySqlConnection; +import io.debezium.connector.mysql.strategy.mysql.MySqlConnectionConfiguration; import redis.clients.jedis.Jedis; public class TestUtils { - private TestUtils() { + public static void insertCustomerToMySql(DebeziumTestContainerWrapper container, String firstName, String lastName, String email) + throws IOException, InterruptedException { + container.execInContainer("mysql", + "-u", MYSQL_PRIVILEGED_USER, + "-p" + MYSQL_PRIVILEGED_PASSWORD, + "-D", MYSQL_DATABASE, + "-e", "INSERT INTO customers VALUES (default,'" + firstName + "','" + lastName + "','" + email + "')"); } - public static PostgresConnection getPostgresConnection() { - return new PostgresConnection(JdbcConfiguration.create() - .with("user", PostgresTestResourceLifecycleManager.POSTGRES_USER) - .with("password", PostgresTestResourceLifecycleManager.POSTGRES_PASSWORD) - .with("dbname", PostgresTestResourceLifecycleManager.POSTGRES_DBNAME) - .with("hostname", PostgresTestResourceLifecycleManager.POSTGRES_HOST) - .with("port", PostgresTestResourceLifecycleManager.getContainer().getMappedPort(PostgresTestResourceLifecycleManager.POSTGRES_PORT)) - .build(), "Debezium Redis Test"); + public static MySqlConnection getMySqlConnection(DebeziumTestContainerWrapper container) { + return new MySqlConnection(new MySqlConnectionConfiguration(Configuration.create() + .with("database.user", "root") + .with("database.password", MYSQL_ROOT_PASSWORD) + .with("database.dbname", MYSQL_DATABASE) + .with("database.hostname", LOCALHOST) + .with("database.port", container.getMappedPort(MYSQL_PORT)) + .build()), null); } - public static void awaitStreamLengthGte(Jedis jedis, String streamName, int expectedLength) { - waitBoolean(() -> jedis.xlen(streamName) >= expectedLength); + public static void waitForStreamLength(Jedis jedis, String streamName, int expectedLength) { + await() + .atMost(60, TimeUnit.SECONDS) + .until(() -> jedis.xlen(streamName) == expectedLength); } public static void awaitHashSizeGte(Jedis jedis, String hashName, int expectedSize) { - waitBoolean(() -> jedis.hgetAll(hashName).size() >= expectedSize); - } - - public static void waitBoolean(Supplier bool) { - Awaitility.await().atMost(Duration.ofSeconds(TestConfigSource.waitForSeconds())).until(() -> { - return Boolean.TRUE.equals(bool.get()); - }); + await() + .atMost(60, TimeUnit.SECONDS) + .until(() -> jedis.hgetAll(hashName).size() >= expectedSize); } } diff --git a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestUtils.java b/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestUtils.java deleted file mode 100644 index 5aa2526b..00000000 --- a/debezium-server-redis/src/test/java/io/debezium/server/redis/wip/TestUtils.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.server.redis.wip; - -import static io.debezium.server.redis.wip.TestConstants.LOCALHOST; -import static io.debezium.server.redis.wip.TestConstants.MYSQL_DATABASE; -import static io.debezium.server.redis.wip.TestConstants.MYSQL_PORT; -import static io.debezium.server.redis.wip.TestConstants.MYSQL_PRIVILEGED_PASSWORD; -import static io.debezium.server.redis.wip.TestConstants.MYSQL_PRIVILEGED_USER; -import static io.debezium.server.redis.wip.TestConstants.MYSQL_ROOT_PASSWORD; -import static org.awaitility.Awaitility.await; - -import java.io.IOException; -import java.util.concurrent.TimeUnit; - -import io.debezium.config.Configuration; -import io.debezium.connector.mysql.strategy.mysql.MySqlConnection; -import io.debezium.connector.mysql.strategy.mysql.MySqlConnectionConfiguration; - -import redis.clients.jedis.Jedis; - -public class TestUtils { - - public static void insertCustomerToMySql(DebeziumTestContainerWrapper container, String firstName, String lastName, String email) - throws IOException, InterruptedException { - container.execInContainer("mysql", - "-u", MYSQL_PRIVILEGED_USER, - "-p" + MYSQL_PRIVILEGED_PASSWORD, - "-D", MYSQL_DATABASE, - "-e", "INSERT INTO customers VALUES (default,'" + firstName + "','" + lastName + "','" + email + "')"); - } - - public static MySqlConnection getMySqlConnection(DebeziumTestContainerWrapper container) { - return new MySqlConnection(new MySqlConnectionConfiguration(Configuration.create() - .with("database.user", "root") - .with("database.password", MYSQL_ROOT_PASSWORD) - .with("database.dbname", MYSQL_DATABASE) - .with("database.hostname", LOCALHOST) - .with("database.port", container.getMappedPort(MYSQL_PORT)) - .build()), null); - } - - public static void waitForStreamLength(Jedis jedis, String streamName, int expectedLength) { - await() - .atMost(60, TimeUnit.SECONDS) - .until(() -> jedis.xlen(streamName) == expectedLength); - } - - public static void awaitHashSizeGte(Jedis jedis, String hashName, int expectedSize) { - await() - .atMost(60, TimeUnit.SECONDS) - .until(() -> jedis.hgetAll(hashName).size() >= expectedSize); - } - -} diff --git a/pom.xml b/pom.xml index 1fac7523..346a5003 100644 --- a/pom.xml +++ b/pom.xml @@ -23,8 +23,6 @@ ${project.version} - true - ${artifactId}