Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updating tests with distributed mode #338

Merged
merged 3 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@

import java.io.File;
import java.io.IOException;
import java.net.ConnectException;
import java.net.ServerSocket;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
Expand All @@ -47,7 +50,10 @@
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.avro.generic.GenericRecord;
import org.junit.jupiter.api.TestInfo;
Expand All @@ -56,9 +62,9 @@
import org.testcontainers.utility.DockerImageName;

public interface IntegrationBase {

String PLUGINS_S_3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA = "plugins/s3-source-connector-for-apache-kafka/";
String S_3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA_TEST = "s3-source-connector-for-apache-kafka-test-";
String PLUGINS_S3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA = "plugins/s3-source-connector-for-apache-kafka/";
String S3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA_TEST = "s3-source-connector-for-apache-kafka-test-";
ObjectMapper OBJECT_MAPPER = new ObjectMapper();

default AdminClient newAdminClient(final String bootstrapServers) {
final Properties adminClientConfig = new Properties();
Expand All @@ -76,9 +82,9 @@ static void extractConnectorPlugin(File pluginDir) throws IOException, Interrupt
}

static File getPluginDir() throws IOException {
final File testDir = Files.createTempDirectory(S_3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA_TEST).toFile();
final File testDir = Files.createTempDirectory(S3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA_TEST).toFile();

final File pluginDir = new File(testDir, PLUGINS_S_3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA);
final File pluginDir = new File(testDir, PLUGINS_S3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA);
assert pluginDir.mkdirs();
return pluginDir;
}
Expand Down Expand Up @@ -202,4 +208,33 @@ static List<JsonNode> consumeJsonMessages(final String topic, final int expected
return recordsList;
}
}

static Map<String, Object> consumeOffsetStorageMessages(final String topic, final int expectedMessageCount,
final String bootstrapServer) throws ConnectException {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
final Map<String, Object> messages = new HashMap<>();
consumer.subscribe(Collections.singletonList(topic));

// Poll messages from the topic
while (messages.size() < expectedMessageCount) {
final ConsumerRecords<byte[], byte[]> records = consumer.poll(5L);
AnatolyPopov marked this conversation as resolved.
Show resolved Hide resolved
for (final ConsumerRecord<byte[], byte[]> record : records) {
messages.putAll(OBJECT_MAPPER.readValue(new String(record.value(), StandardCharsets.UTF_8), // NOPMD
new TypeReference<>() { // NOPMD
}));
}
}
return messages;

} catch (JsonProcessingException e) {
throw new ConnectException("Error while consuming messages " + e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.aiven.kafka.connect.s3.source;

import static io.aiven.kafka.connect.s3.source.S3SourceTask.OBJECT_KEY;
import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.AWS_ACCESS_KEY_ID_CONFIG;
import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.AWS_S3_BUCKET_NAME_CONFIG;
import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.AWS_S3_ENDPOINT_CONFIG;
Expand All @@ -29,19 +30,23 @@
import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.TARGET_TOPIC_PARTITIONS;
import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.VALUE_CONVERTER_SCHEMA_REGISTRY_URL;
import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.VALUE_SERIALIZER;
import static io.aiven.kafka.connect.s3.source.utils.OffsetManager.SEPARATOR;
import static org.assertj.core.api.Assertions.assertThat;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.net.ConnectException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.admin.AdminClient;
Expand Down Expand Up @@ -71,6 +76,7 @@
import org.testcontainers.junit.jupiter.Testcontainers;

@Testcontainers
@SuppressWarnings("PMD.ExcessiveImports")
final class IntegrationTest implements IntegrationBase {

private static final Logger LOGGER = LoggerFactory.getLogger(IntegrationTest.class);
Expand Down Expand Up @@ -154,12 +160,14 @@ void bytesTest(final TestInfo testInfo) throws ExecutionException, InterruptedEx
final String testData1 = "Hello, Kafka Connect S3 Source! object 1";
final String testData2 = "Hello, Kafka Connect S3 Source! object 2";

final Set<String> offsetKeys = new HashSet<>();

// write 2 objects to s3
writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "00000");
writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "00000");
writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "00001");
writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "00001");
writeToS3(topicName, new byte[0], "00003"); // this should be ignored.
offsetKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "00000"));
offsetKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "00000"));
offsetKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "00001"));
offsetKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "00001"));
offsetKeys.add(writeToS3(topicName, new byte[0], "00003"));

final List<String> objects = testBucketAccessor.listObjects();
assertThat(objects.size()).isEqualTo(5);
Expand All @@ -172,6 +180,9 @@ void bytesTest(final TestInfo testInfo) throws ExecutionException, InterruptedEx

// Verify that the correct data is read from the S3 bucket and pushed to Kafka
assertThat(records).contains(testData1).contains(testData2);

// Verify offset positions
verifyOffsetPositions(offsetKeys, 4);
}

@Test
Expand All @@ -188,7 +199,9 @@ void bytesTestBasedOnMaxMessageBytes(final TestInfo testInfo)

connectRunner.configureConnector(CONNECTOR_NAME, connectorConfig);

writeToS3(topicName, testData.getBytes(StandardCharsets.UTF_8), "00000");
final Set<String> offsetKeys = new HashSet<>();

offsetKeys.add(writeToS3(topicName, testData.getBytes(StandardCharsets.UTF_8), "00000"));

// Poll messages from the Kafka topic and verify the consumed data
final List<String> records = IntegrationBase.consumeMessages(topicName, 5, connectRunner.getBootstrapServers());
Expand All @@ -200,6 +213,9 @@ void bytesTestBasedOnMaxMessageBytes(final TestInfo testInfo)
assertThat(records.get(2)).isEqualTo("CC");
assertThat(records.get(3)).isEqualTo("DD");
assertThat(records.get(4)).isEqualTo("EE");

// Verify offset positions
verifyOffsetPositions(offsetKeys, 1);
}

@Test
Expand All @@ -224,12 +240,13 @@ void avroTest(final TestInfo testInfo) throws ExecutionException, InterruptedExc
final ByteArrayOutputStream outputStream1 = getAvroRecord(schema, 1, 100);
final ByteArrayOutputStream outputStream2 = getAvroRecord(schema, 2, 100);

writeToS3(topicName, outputStream1.toByteArray(), "00001");
writeToS3(topicName, outputStream2.toByteArray(), "00001");
final Set<String> offsetKeys = new HashSet<>();

writeToS3(topicName, outputStream1.toByteArray(), "00002");
writeToS3(topicName, outputStream2.toByteArray(), "00002");
writeToS3(topicName, outputStream2.toByteArray(), "00002");
offsetKeys.add(writeToS3(topicName, outputStream1.toByteArray(), "00001"));
offsetKeys.add(writeToS3(topicName, outputStream2.toByteArray(), "00001"));
offsetKeys.add(writeToS3(topicName, outputStream1.toByteArray(), "00002"));
offsetKeys.add(writeToS3(topicName, outputStream2.toByteArray(), "00002"));
offsetKeys.add(writeToS3(topicName, outputStream2.toByteArray(), "00002"));

final List<String> objects = testBucketAccessor.listObjects();
assertThat(objects.size()).isEqualTo(5);
Expand All @@ -247,10 +264,13 @@ void avroTest(final TestInfo testInfo) throws ExecutionException, InterruptedExc
.contains("Hello, Kafka Connect S3 Source! object 1")
.contains("Hello, Kafka Connect S3 Source! object 2");
assertThat(records).extracting(record -> record.get("id").toString()).contains("1").contains("2");

// Verify offset positions
verifyOffsetPositions(offsetKeys, 5);
}

@Test
void parquetTest(final TestInfo testInfo) throws ExecutionException, InterruptedException, IOException {
void parquetTest(final TestInfo testInfo) throws IOException {
final var topicName = IntegrationBase.topicName(testInfo);
final Map<String, String> connectorConfig = getConfig(basicConnectorConfig(CONNECTOR_NAME), topicName);
connectorConfig.put(INPUT_FORMAT_KEY, InputFormat.PARQUET.getValue());
Expand Down Expand Up @@ -282,7 +302,7 @@ void parquetTest(final TestInfo testInfo) throws ExecutionException, Interrupted
}

@Test
void jsonTest(final TestInfo testInfo) throws ExecutionException, InterruptedException, IOException {
void jsonTest(final TestInfo testInfo) throws IOException {
final var topicName = IntegrationBase.topicName(testInfo);
final Map<String, String> connectorConfig = getConfig(basicConnectorConfig(CONNECTOR_NAME), topicName);
connectorConfig.put(INPUT_FORMAT_KEY, InputFormat.JSONL.getValue());
Expand All @@ -296,15 +316,18 @@ void jsonTest(final TestInfo testInfo) throws ExecutionException, InterruptedExc
jsonBuilder.append(jsonContent).append("\n"); // NOPMD
}
final byte[] jsonBytes = jsonBuilder.toString().getBytes(StandardCharsets.UTF_8);
final Set<String> offsetKeys = new HashSet<>();

writeToS3(topicName, jsonBytes, "00001");

offsetKeys.add(writeToS3(topicName, jsonBytes, "00001"));
// Poll Json messages from the Kafka topic and deserialize them
final List<JsonNode> records = IntegrationBase.consumeJsonMessages(topicName, 500,
connectRunner.getBootstrapServers());

assertThat(records).extracting(record -> record.get("payload").get("message").asText()).contains(testMessage);
assertThat(records).extracting(record -> record.get("payload").get("id").asText()).contains("1");

// Verify offset positions
verifyOffsetPositions(offsetKeys, 1);
}

@SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
Expand Down Expand Up @@ -332,15 +355,17 @@ private static ByteArrayOutputStream getAvroRecord(final Schema schema, final in
return outputStream;
}

private static void writeToS3(final String topicName, final byte[] testDataBytes, final String partitionId)
private static String writeToS3(final String topicName, final byte[] testDataBytes, final String partitionId)
throws IOException {
final String filePrefix = topicName + "-" + partitionId + "-" + System.currentTimeMillis();
final String fileSuffix = ".txt";

final Path testFilePath = File.createTempFile(filePrefix, fileSuffix).toPath();
final String objectKey = filePrefix + fileSuffix;
try {
Files.write(testFilePath, testDataBytes);
saveToS3(TEST_BUCKET_NAME, "", filePrefix + fileSuffix, testFilePath.toFile());
saveToS3(TEST_BUCKET_NAME, "", objectKey, testFilePath.toFile());
return OBJECT_KEY + SEPARATOR + objectKey;
} finally {
Files.delete(testFilePath);
}
Expand Down Expand Up @@ -372,4 +397,10 @@ public static void saveToS3(final String bucketName, final String folderName, fi
final PutObjectRequest request = new PutObjectRequest(bucketName, folderName + fileNameInS3, fileToWrite);
s3Client.putObject(request);
}

private void verifyOffsetPositions(final Set<String> offsetKeys, final int messagesCount) throws ConnectException {
final Map<String, Object> offsetRecs = IntegrationBase.consumeOffsetStorageMessages(
"connect-offset-topic-" + CONNECTOR_NAME, messagesCount, connectRunner.getBootstrapServers());
assertThat(offsetRecs.keySet()).hasSize(messagesCount).isSubsetOf(offsetKeys);
}
}
Loading