Skip to content

Commit

Permalink
Merge pull request #338 from Aiven-Open/add-intg-tests
Browse files Browse the repository at this point in the history
Updating tests with distributed mode
  • Loading branch information
aindriu-aiven authored Nov 12, 2024
2 parents 6d10696 + 8085866 commit 3bd8a17
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@

import java.io.File;
import java.io.IOException;
import java.net.ConnectException;
import java.net.ServerSocket;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
Expand All @@ -47,7 +50,10 @@
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.avro.generic.GenericRecord;
import org.junit.jupiter.api.TestInfo;
Expand All @@ -56,9 +62,9 @@
import org.testcontainers.utility.DockerImageName;

public interface IntegrationBase {

String PLUGINS_S_3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA = "plugins/s3-source-connector-for-apache-kafka/";
String S_3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA_TEST = "s3-source-connector-for-apache-kafka-test-";
String PLUGINS_S3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA = "plugins/s3-source-connector-for-apache-kafka/";
String S3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA_TEST = "s3-source-connector-for-apache-kafka-test-";
ObjectMapper OBJECT_MAPPER = new ObjectMapper();

default AdminClient newAdminClient(final String bootstrapServers) {
final Properties adminClientConfig = new Properties();
Expand All @@ -76,9 +82,9 @@ static void extractConnectorPlugin(File pluginDir) throws IOException, Interrupt
}

static File getPluginDir() throws IOException {
final File testDir = Files.createTempDirectory(S_3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA_TEST).toFile();
final File testDir = Files.createTempDirectory(S3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA_TEST).toFile();

final File pluginDir = new File(testDir, PLUGINS_S_3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA);
final File pluginDir = new File(testDir, PLUGINS_S3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA);
assert pluginDir.mkdirs();
return pluginDir;
}
Expand Down Expand Up @@ -202,4 +208,33 @@ static List<JsonNode> consumeJsonMessages(final String topic, final int expected
return recordsList;
}
}

static Map<String, Object> consumeOffsetStorageMessages(final String topic, final int expectedMessageCount,
final String bootstrapServer) throws ConnectException {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
final Map<String, Object> messages = new HashMap<>();
consumer.subscribe(Collections.singletonList(topic));

// Poll messages from the topic
while (messages.size() < expectedMessageCount) {
final ConsumerRecords<byte[], byte[]> records = consumer.poll(5L);
for (final ConsumerRecord<byte[], byte[]> record : records) {
messages.putAll(OBJECT_MAPPER.readValue(new String(record.value(), StandardCharsets.UTF_8), // NOPMD
new TypeReference<>() { // NOPMD
}));
}
}
return messages;

} catch (JsonProcessingException e) {
throw new ConnectException("Error while consuming messages " + e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.aiven.kafka.connect.s3.source;

import static io.aiven.kafka.connect.s3.source.S3SourceTask.OBJECT_KEY;
import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.AWS_ACCESS_KEY_ID_CONFIG;
import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.AWS_S3_BUCKET_NAME_CONFIG;
import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.AWS_S3_ENDPOINT_CONFIG;
Expand All @@ -29,19 +30,23 @@
import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.TARGET_TOPIC_PARTITIONS;
import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.VALUE_CONVERTER_SCHEMA_REGISTRY_URL;
import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.VALUE_SERIALIZER;
import static io.aiven.kafka.connect.s3.source.utils.OffsetManager.SEPARATOR;
import static org.assertj.core.api.Assertions.assertThat;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.net.ConnectException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.admin.AdminClient;
Expand Down Expand Up @@ -71,6 +76,7 @@
import org.testcontainers.junit.jupiter.Testcontainers;

@Testcontainers
@SuppressWarnings("PMD.ExcessiveImports")
final class IntegrationTest implements IntegrationBase {

private static final Logger LOGGER = LoggerFactory.getLogger(IntegrationTest.class);
Expand Down Expand Up @@ -154,12 +160,14 @@ void bytesTest(final TestInfo testInfo) throws ExecutionException, InterruptedEx
final String testData1 = "Hello, Kafka Connect S3 Source! object 1";
final String testData2 = "Hello, Kafka Connect S3 Source! object 2";

final Set<String> offsetKeys = new HashSet<>();

// write 2 objects to s3
writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "00000");
writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "00000");
writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "00001");
writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "00001");
writeToS3(topicName, new byte[0], "00003"); // this should be ignored.
offsetKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "00000"));
offsetKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "00000"));
offsetKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "00001"));
offsetKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "00001"));
offsetKeys.add(writeToS3(topicName, new byte[0], "00003"));

final List<String> objects = testBucketAccessor.listObjects();
assertThat(objects.size()).isEqualTo(5);
Expand All @@ -172,6 +180,9 @@ void bytesTest(final TestInfo testInfo) throws ExecutionException, InterruptedEx

// Verify that the correct data is read from the S3 bucket and pushed to Kafka
assertThat(records).contains(testData1).contains(testData2);

// Verify offset positions
verifyOffsetPositions(offsetKeys, 4);
}

@Test
Expand All @@ -188,7 +199,9 @@ void bytesTestBasedOnMaxMessageBytes(final TestInfo testInfo)

connectRunner.configureConnector(CONNECTOR_NAME, connectorConfig);

writeToS3(topicName, testData.getBytes(StandardCharsets.UTF_8), "00000");
final Set<String> offsetKeys = new HashSet<>();

offsetKeys.add(writeToS3(topicName, testData.getBytes(StandardCharsets.UTF_8), "00000"));

// Poll messages from the Kafka topic and verify the consumed data
final List<String> records = IntegrationBase.consumeMessages(topicName, 5, connectRunner.getBootstrapServers());
Expand All @@ -200,6 +213,9 @@ void bytesTestBasedOnMaxMessageBytes(final TestInfo testInfo)
assertThat(records.get(2)).isEqualTo("CC");
assertThat(records.get(3)).isEqualTo("DD");
assertThat(records.get(4)).isEqualTo("EE");

// Verify offset positions
verifyOffsetPositions(offsetKeys, 1);
}

@Test
Expand All @@ -224,12 +240,13 @@ void avroTest(final TestInfo testInfo) throws ExecutionException, InterruptedExc
final ByteArrayOutputStream outputStream1 = getAvroRecord(schema, 1, 100);
final ByteArrayOutputStream outputStream2 = getAvroRecord(schema, 2, 100);

writeToS3(topicName, outputStream1.toByteArray(), "00001");
writeToS3(topicName, outputStream2.toByteArray(), "00001");
final Set<String> offsetKeys = new HashSet<>();

writeToS3(topicName, outputStream1.toByteArray(), "00002");
writeToS3(topicName, outputStream2.toByteArray(), "00002");
writeToS3(topicName, outputStream2.toByteArray(), "00002");
offsetKeys.add(writeToS3(topicName, outputStream1.toByteArray(), "00001"));
offsetKeys.add(writeToS3(topicName, outputStream2.toByteArray(), "00001"));
offsetKeys.add(writeToS3(topicName, outputStream1.toByteArray(), "00002"));
offsetKeys.add(writeToS3(topicName, outputStream2.toByteArray(), "00002"));
offsetKeys.add(writeToS3(topicName, outputStream2.toByteArray(), "00002"));

final List<String> objects = testBucketAccessor.listObjects();
assertThat(objects.size()).isEqualTo(5);
Expand All @@ -247,10 +264,13 @@ void avroTest(final TestInfo testInfo) throws ExecutionException, InterruptedExc
.contains("Hello, Kafka Connect S3 Source! object 1")
.contains("Hello, Kafka Connect S3 Source! object 2");
assertThat(records).extracting(record -> record.get("id").toString()).contains("1").contains("2");

// Verify offset positions
verifyOffsetPositions(offsetKeys, 5);
}

@Test
void parquetTest(final TestInfo testInfo) throws ExecutionException, InterruptedException, IOException {
void parquetTest(final TestInfo testInfo) throws IOException {
final var topicName = IntegrationBase.topicName(testInfo);
final Map<String, String> connectorConfig = getConfig(basicConnectorConfig(CONNECTOR_NAME), topicName);
connectorConfig.put(INPUT_FORMAT_KEY, InputFormat.PARQUET.getValue());
Expand Down Expand Up @@ -282,7 +302,7 @@ void parquetTest(final TestInfo testInfo) throws ExecutionException, Interrupted
}

@Test
void jsonTest(final TestInfo testInfo) throws ExecutionException, InterruptedException, IOException {
void jsonTest(final TestInfo testInfo) throws IOException {
final var topicName = IntegrationBase.topicName(testInfo);
final Map<String, String> connectorConfig = getConfig(basicConnectorConfig(CONNECTOR_NAME), topicName);
connectorConfig.put(INPUT_FORMAT_KEY, InputFormat.JSONL.getValue());
Expand All @@ -296,15 +316,18 @@ void jsonTest(final TestInfo testInfo) throws ExecutionException, InterruptedExc
jsonBuilder.append(jsonContent).append("\n"); // NOPMD
}
final byte[] jsonBytes = jsonBuilder.toString().getBytes(StandardCharsets.UTF_8);
final Set<String> offsetKeys = new HashSet<>();

writeToS3(topicName, jsonBytes, "00001");

offsetKeys.add(writeToS3(topicName, jsonBytes, "00001"));
// Poll Json messages from the Kafka topic and deserialize them
final List<JsonNode> records = IntegrationBase.consumeJsonMessages(topicName, 500,
connectRunner.getBootstrapServers());

assertThat(records).extracting(record -> record.get("payload").get("message").asText()).contains(testMessage);
assertThat(records).extracting(record -> record.get("payload").get("id").asText()).contains("1");

// Verify offset positions
verifyOffsetPositions(offsetKeys, 1);
}

@SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
Expand Down Expand Up @@ -332,15 +355,17 @@ private static ByteArrayOutputStream getAvroRecord(final Schema schema, final in
return outputStream;
}

private static void writeToS3(final String topicName, final byte[] testDataBytes, final String partitionId)
private static String writeToS3(final String topicName, final byte[] testDataBytes, final String partitionId)
throws IOException {
final String filePrefix = topicName + "-" + partitionId + "-" + System.currentTimeMillis();
final String fileSuffix = ".txt";

final Path testFilePath = File.createTempFile(filePrefix, fileSuffix).toPath();
final String objectKey = filePrefix + fileSuffix;
try {
Files.write(testFilePath, testDataBytes);
saveToS3(TEST_BUCKET_NAME, "", filePrefix + fileSuffix, testFilePath.toFile());
saveToS3(TEST_BUCKET_NAME, "", objectKey, testFilePath.toFile());
return OBJECT_KEY + SEPARATOR + objectKey;
} finally {
Files.delete(testFilePath);
}
Expand Down Expand Up @@ -372,4 +397,10 @@ public static void saveToS3(final String bucketName, final String folderName, fi
final PutObjectRequest request = new PutObjectRequest(bucketName, folderName + fileNameInS3, fileToWrite);
s3Client.putObject(request);
}

private void verifyOffsetPositions(final Set<String> offsetKeys, final int messagesCount) throws ConnectException {
final Map<String, Object> offsetRecs = IntegrationBase.consumeOffsetStorageMessages(
"connect-offset-topic-" + CONNECTOR_NAME, messagesCount, connectRunner.getBootstrapServers());
assertThat(offsetRecs.keySet()).hasSize(messagesCount).isSubsetOf(offsetKeys);
}
}

0 comments on commit 3bd8a17

Please sign in to comment.