Skip to content

Commit

Permalink
KAFKA-428: Mark the copy complete in the source offset for the last c…
Browse files Browse the repository at this point in the history
…opied document (#168)
  • Loading branch information
Calvinnix authored Oct 21, 2024
1 parent d477f00 commit e4cda27
Show file tree
Hide file tree
Showing 2 changed files with 183 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertIterableEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -169,6 +170,9 @@ void testSourceLoadsDataFromMongoClientWithCopyExisting() {
new HashMap<String, String>() {
{
put(MongoSourceConfig.STARTUP_MODE_CONFIG, StartupMode.COPY_EXISTING.propertyValue());
String namespaceRegex =
String.format("(%s\\.coll|%s\\.coll)", db1.getName(), db2.getName());
put(MongoSourceConfig.COPY_EXISTING_NAMESPACE_REGEX_CONFIG, namespaceRegex);
}
};
task.start(cfg);
Expand All @@ -178,11 +182,22 @@ void testSourceLoadsDataFromMongoClientWithCopyExisting() {
() -> assertEquals(150, firstPoll.size()),
() -> assertSourceRecordValues(createInserts(1, 75), firstPoll, coll1),
() -> assertSourceRecordValues(createInserts(1, 75), firstPoll, coll2),
// make sure all elements, except the last, contains the "copy" key
() ->
assertTrue(
firstPoll.stream()
.map(SourceRecord::sourceOffset)
.allMatch(i -> i.containsKey("copy"))));
.limit(150 - 1) // exclude the last record
.allMatch(i -> i.containsKey("copy"))),
// make sure that the last copied element does not have the "copy" key
() ->
assertTrue(
firstPoll.stream()
.map(SourceRecord::sourceOffset)
.skip(150 - 1) // exclude the last record
.findFirst()
.filter(i -> !i.containsKey("copy"))
.isPresent()));

assertNull(task.poll());

Expand Down Expand Up @@ -533,6 +548,7 @@ void testSourceCanUseCustomOffsetPartitionNames() {
@Test
@DisplayName("Copy existing with a restart midway through")
void testCopyingExistingWithARestartMidwayThrough() {
assumeTrue(isGreaterThanThreeDotSix());
try (AutoCloseableSourceTask task = createSourceTask()) {

MongoCollection<Document> coll = getCollection();
Expand All @@ -544,7 +560,7 @@ void testCopyingExistingWithARestartMidwayThrough() {
put(MongoSourceConfig.COLLECTION_CONFIG, coll.getNamespace().getCollectionName());
put(MongoSourceConfig.STARTUP_MODE_CONFIG, StartupMode.COPY_EXISTING.propertyValue());
put(MongoSourceConfig.POLL_MAX_BATCH_SIZE_CONFIG, "25");
put(MongoSourceConfig.POLL_AWAIT_TIME_MS_CONFIG, "10000");
put(MongoSourceConfig.POLL_AWAIT_TIME_MS_CONFIG, "1000");
}
};

Expand All @@ -556,6 +572,17 @@ void testCopyingExistingWithARestartMidwayThrough() {
assertTrue(
firstPoll.stream().map(SourceRecord::sourceOffset).allMatch(i -> i.containsKey("copy")));

Map<String, ?> lastOffset = firstPoll.get(25 - 1).sourceOffset();

// mock the context so that on restart we know where the last task left off
when(context.offsetStorageReader()).thenReturn(offsetStorageReader);
assertNotNull(lastOffset.get("_id")); // check to make sure the value is an Object
@SuppressWarnings("unchecked")
Map<String, Object> mockedOffset = (Map<String, Object>) lastOffset;
when(offsetStorageReader.offset(any())).thenReturn(mockedOffset);
task.initialize(context);

// perform a restart
task.stop();
task.start(cfg);

Expand All @@ -566,8 +593,20 @@ void testCopyingExistingWithARestartMidwayThrough() {

List<SourceRecord> thirdPoll = getNextBatch(task);
assertSourceRecordValues(createInserts(26, 50), thirdPoll, coll);
// Make sure all elements, except the last one, contains the "copy" key
assertTrue(
thirdPoll.stream().map(SourceRecord::sourceOffset).allMatch(i -> i.containsKey("copy")));
thirdPoll.stream()
.map(SourceRecord::sourceOffset)
.limit(25 - 1) // exclude the last record in the batch
.allMatch(i -> i.containsKey("copy")));
// Make sure the last copied element does not contain the "copy" key
assertTrue(
thirdPoll.stream()
.map(SourceRecord::sourceOffset)
.skip(25 - 1) // exclude the last record in the batch
.findFirst()
.filter(i -> !i.containsKey("copy"))
.isPresent());

assertTrue(getNextBatch(task).isEmpty());
insertMany(rangeClosed(51, 75), coll);
Expand All @@ -579,6 +618,72 @@ void testCopyingExistingWithARestartMidwayThrough() {
}
}

@Test
@DisplayName("Copy existing with a restart after finishing")
void testCopyingExistingWithARestartAfterFinishing() {
assumeTrue(isGreaterThanThreeDotSix());
try (AutoCloseableSourceTask task = createSourceTask()) {

MongoCollection<Document> coll = getCollection();

HashMap<String, String> cfg =
new HashMap<String, String>() {
{
put(MongoSourceConfig.DATABASE_CONFIG, coll.getNamespace().getDatabaseName());
put(MongoSourceConfig.COLLECTION_CONFIG, coll.getNamespace().getCollectionName());
put(MongoSourceConfig.STARTUP_MODE_CONFIG, StartupMode.COPY_EXISTING.propertyValue());
put(MongoSourceConfig.POLL_MAX_BATCH_SIZE_CONFIG, "25");
put(MongoSourceConfig.POLL_AWAIT_TIME_MS_CONFIG, "1000");
}
};

insertMany(rangeClosed(1, 50), coll);
task.start(cfg);

List<SourceRecord> firstPoll = getNextBatch(task);
assertSourceRecordValues(createInserts(1, 25), firstPoll, coll);
assertTrue(
firstPoll.stream().map(SourceRecord::sourceOffset).allMatch(i -> i.containsKey("copy")));

List<SourceRecord> secondPoll = getNextBatch(task);
assertSourceRecordValues(createInserts(26, 50), secondPoll, coll);
// Make sure all elements, except the last one, contains the "copy" key
assertTrue(
secondPoll.stream()
.map(SourceRecord::sourceOffset)
.limit(25 - 1) // exclude the last record in the batch
.allMatch(i -> i.containsKey("copy")));

Map<String, ?> lastOffset = secondPoll.get(25 - 1).sourceOffset();

// Make sure the last copied element does not contain the "copy" key
assertFalse(lastOffset.containsKey("copy"));

// mock the context so that on restart we know where the last task left off
when(context.offsetStorageReader()).thenReturn(offsetStorageReader);
assertNotNull(lastOffset.get("_id")); // check to make sure the value is an Object
@SuppressWarnings("unchecked")
Map<String, Object> mockedOffset = (Map<String, Object>) lastOffset;
when(offsetStorageReader.offset(any())).thenReturn(mockedOffset);
task.initialize(context);

// perform a restart
task.stop();
task.start(cfg);

// make sure that a copy doesn't occur again because all data was already copied
assertTrue(getNextBatch(task).isEmpty());

// make sure that we can continue to process data
insertMany(rangeClosed(51, 75), coll);

List<SourceRecord> thirdPoll = getNextBatch(task);
assertSourceRecordValues(createInserts(51, 75), thirdPoll, coll);
assertFalse(
thirdPoll.stream().map(SourceRecord::sourceOffset).anyMatch(i -> i.containsKey("copy")));
}
}

@Test
@DisplayName("Ensure source loads data from collection and outputs documents only")
void testSourceLoadsDataFromCollectionDocumentOnly() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -210,68 +211,81 @@ private List<SourceRecord> pollInternal() {
createValueSchemaAndValueProvider(sourceConfig);

List<SourceRecord> sourceRecords = new ArrayList<>();
getNextBatch()
.forEach(
changeStreamDocument -> {
Map<String, String> sourceOffset = new HashMap<>();
sourceOffset.put(ID_FIELD, changeStreamDocument.getDocument(ID_FIELD).toJson());
if (isCopying) {
sourceOffset.put(COPY_KEY, "true");
}
Iterator<BsonDocument> batchIterator = getNextBatch().iterator();
while (batchIterator.hasNext()) {
BsonDocument changeStreamDocument = batchIterator.next();
Map<String, String> sourceOffset = new HashMap<>();
sourceOffset.put(ID_FIELD, changeStreamDocument.getDocument(ID_FIELD).toJson());
if (isCopying) {
sourceOffset.put(COPY_KEY, "true");
}

String topicName = topicMapper.getTopic(changeStreamDocument);
if (topicName.isEmpty()) {
LOGGER.warn(
"No topic set. Could not publish the message: {}",
changeStreamDocument.toJson());
} else {

Optional<BsonDocument> valueDocument = Optional.empty();

boolean isTombstoneEvent =
publishFullDocumentOnlyTombstoneOnDelete
&& !changeStreamDocument.containsKey(FULL_DOCUMENT);
if (publishFullDocumentOnly) {
if (changeStreamDocument.containsKey(FULL_DOCUMENT)
&& changeStreamDocument.get(FULL_DOCUMENT).isDocument()) {
valueDocument = Optional.of(changeStreamDocument.getDocument(FULL_DOCUMENT));
}
} else {
valueDocument = Optional.of(changeStreamDocument);
}

if (valueDocument.isPresent() || isTombstoneEvent) {
BsonDocument valueDoc = valueDocument.orElse(new BsonDocument());
LOGGER.trace("Adding {} to {}: {}", valueDoc, topicName, sourceOffset);

if (valueDoc instanceof RawBsonDocument) {
int sizeBytes = ((RawBsonDocument) valueDoc).getByteBuffer().limit();
statisticsManager.currentStatistics().getMongodbBytesRead().sample(sizeBytes);
}

BsonDocument keyDocument;
if (sourceConfig.getKeyOutputFormat() == MongoSourceConfig.OutputFormat.SCHEMA) {
keyDocument = changeStreamDocument;
} else if (sourceConfig.getBoolean(DOCUMENT_KEY_AS_KEY_CONFIG)
&& changeStreamDocument.containsKey(DOCUMENT_KEY_FIELD)) {
keyDocument = changeStreamDocument.getDocument(DOCUMENT_KEY_FIELD);
} else {
keyDocument = new BsonDocument(ID_FIELD, changeStreamDocument.get(ID_FIELD));
}

createSourceRecord(
keySchemaAndValueProducer,
isTombstoneEvent
? TOMBSTONE_SCHEMA_AND_VALUE_PRODUCER
: valueSchemaAndValueProducer,
sourceOffset,
topicName,
keyDocument,
valueDoc)
.map(sourceRecords::add);
}
}
});
// if isCopying is true, we want to set the COPY_KEY flag so that kafka has context that a
// copy is in progress. However, for the last document that we are copying, we should not set
// this flag because the copy has completed, otherwise we are relying on future change stream
// events to signify that we are no longer copying. We also need to set the _id field to be a
// valid resume token, which during copying exists in the cachedResumeToken variable.
// In version 3.6 of mongodb the cachedResumeToken initializes to null so we need to avoid
// this null pointer exception.
boolean lastDocument = !batchIterator.hasNext();
boolean noMoreDataToCopy = copyDataManager != null && !copyDataManager.isCopying();
if (isCopying && lastDocument && noMoreDataToCopy && cachedResumeToken != null) {
sourceOffset.put(ID_FIELD, cachedResumeToken.toJson());
sourceOffset.remove(COPY_KEY);
}

String topicName = topicMapper.getTopic(changeStreamDocument);
if (topicName.isEmpty()) {
LOGGER.warn(
"No topic set. Could not publish the message: {}", changeStreamDocument.toJson());
} else {

Optional<BsonDocument> valueDocument = Optional.empty();

boolean isTombstoneEvent =
publishFullDocumentOnlyTombstoneOnDelete
&& !changeStreamDocument.containsKey(FULL_DOCUMENT);
if (publishFullDocumentOnly) {
if (changeStreamDocument.containsKey(FULL_DOCUMENT)
&& changeStreamDocument.get(FULL_DOCUMENT).isDocument()) {
valueDocument = Optional.of(changeStreamDocument.getDocument(FULL_DOCUMENT));
}
} else {
valueDocument = Optional.of(changeStreamDocument);
}

if (valueDocument.isPresent() || isTombstoneEvent) {
BsonDocument valueDoc = valueDocument.orElse(new BsonDocument());
LOGGER.trace("Adding {} to {}: {}", valueDoc, topicName, sourceOffset);

if (valueDoc instanceof RawBsonDocument) {
int sizeBytes = ((RawBsonDocument) valueDoc).getByteBuffer().limit();
statisticsManager.currentStatistics().getMongodbBytesRead().sample(sizeBytes);
}

BsonDocument keyDocument;
if (sourceConfig.getKeyOutputFormat() == MongoSourceConfig.OutputFormat.SCHEMA) {
keyDocument = changeStreamDocument;
} else if (sourceConfig.getBoolean(DOCUMENT_KEY_AS_KEY_CONFIG)
&& changeStreamDocument.containsKey(DOCUMENT_KEY_FIELD)) {
keyDocument = changeStreamDocument.getDocument(DOCUMENT_KEY_FIELD);
} else {
keyDocument = new BsonDocument(ID_FIELD, changeStreamDocument.get(ID_FIELD));
}

createSourceRecord(
keySchemaAndValueProducer,
isTombstoneEvent
? TOMBSTONE_SCHEMA_AND_VALUE_PRODUCER
: valueSchemaAndValueProducer,
sourceOffset,
topicName,
keyDocument,
valueDoc)
.map(sourceRecords::add);
}
}
}
LOGGER.debug("Return batch of {}", sourceRecords.size());

if (sourceRecords.isEmpty()) {
Expand Down

0 comments on commit e4cda27

Please sign in to comment.