diff --git a/src/integrationTest/java/com/mongodb/kafka/connect/source/MongoSourceTaskIntegrationTest.java b/src/integrationTest/java/com/mongodb/kafka/connect/source/MongoSourceTaskIntegrationTest.java index 0f36e4f16..59526df77 100644 --- a/src/integrationTest/java/com/mongodb/kafka/connect/source/MongoSourceTaskIntegrationTest.java +++ b/src/integrationTest/java/com/mongodb/kafka/connect/source/MongoSourceTaskIntegrationTest.java @@ -36,6 +36,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertIterableEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -169,6 +170,9 @@ void testSourceLoadsDataFromMongoClientWithCopyExisting() { new HashMap() { { put(MongoSourceConfig.STARTUP_MODE_CONFIG, StartupMode.COPY_EXISTING.propertyValue()); + String namespaceRegex = + String.format("(%s\\.coll|%s\\.coll)", db1.getName(), db2.getName()); + put(MongoSourceConfig.COPY_EXISTING_NAMESPACE_REGEX_CONFIG, namespaceRegex); } }; task.start(cfg); @@ -178,11 +182,22 @@ void testSourceLoadsDataFromMongoClientWithCopyExisting() { () -> assertEquals(150, firstPoll.size()), () -> assertSourceRecordValues(createInserts(1, 75), firstPoll, coll1), () -> assertSourceRecordValues(createInserts(1, 75), firstPoll, coll2), + // make sure all elements, except the last, contains the "copy" key () -> assertTrue( firstPoll.stream() .map(SourceRecord::sourceOffset) - .allMatch(i -> i.containsKey("copy")))); + .limit(150 - 1) // exclude the last record + .allMatch(i -> i.containsKey("copy"))), + // make sure that the last copied element does not have the "copy" key + () -> + assertTrue( + firstPoll.stream() + .map(SourceRecord::sourceOffset) + .skip(150 - 1) // exclude the last record + .findFirst() + .filter(i -> !i.containsKey("copy")) + .isPresent())); assertNull(task.poll()); @@ -533,6 +548,7 @@ void testSourceCanUseCustomOffsetPartitionNames() { @Test @DisplayName("Copy existing with a restart midway through") void testCopyingExistingWithARestartMidwayThrough() { + assumeTrue(isGreaterThanThreeDotSix()); try (AutoCloseableSourceTask task = createSourceTask()) { MongoCollection coll = getCollection(); @@ -544,7 +560,7 @@ void testCopyingExistingWithARestartMidwayThrough() { put(MongoSourceConfig.COLLECTION_CONFIG, coll.getNamespace().getCollectionName()); put(MongoSourceConfig.STARTUP_MODE_CONFIG, StartupMode.COPY_EXISTING.propertyValue()); put(MongoSourceConfig.POLL_MAX_BATCH_SIZE_CONFIG, "25"); - put(MongoSourceConfig.POLL_AWAIT_TIME_MS_CONFIG, "10000"); + put(MongoSourceConfig.POLL_AWAIT_TIME_MS_CONFIG, "1000"); } }; @@ -556,6 +572,17 @@ void testCopyingExistingWithARestartMidwayThrough() { assertTrue( firstPoll.stream().map(SourceRecord::sourceOffset).allMatch(i -> i.containsKey("copy"))); + Map lastOffset = firstPoll.get(25 - 1).sourceOffset(); + + // mock the context so that on restart we know where the last task left off + when(context.offsetStorageReader()).thenReturn(offsetStorageReader); + assertNotNull(lastOffset.get("_id")); // check to make sure the value is an Object + @SuppressWarnings("unchecked") + Map mockedOffset = (Map) lastOffset; + when(offsetStorageReader.offset(any())).thenReturn(mockedOffset); + task.initialize(context); + + // perform a restart task.stop(); task.start(cfg); @@ -566,8 +593,20 @@ void testCopyingExistingWithARestartMidwayThrough() { List thirdPoll = getNextBatch(task); assertSourceRecordValues(createInserts(26, 50), thirdPoll, coll); + // Make sure all elements, except the last one, contains the "copy" key assertTrue( - thirdPoll.stream().map(SourceRecord::sourceOffset).allMatch(i -> i.containsKey("copy"))); + thirdPoll.stream() + .map(SourceRecord::sourceOffset) + .limit(25 - 1) // exclude the last record in the batch + .allMatch(i -> i.containsKey("copy"))); + // Make sure the last copied element does not contain the "copy" key + assertTrue( + thirdPoll.stream() + .map(SourceRecord::sourceOffset) + .skip(25 - 1) // exclude the last record in the batch + .findFirst() + .filter(i -> !i.containsKey("copy")) + .isPresent()); assertTrue(getNextBatch(task).isEmpty()); insertMany(rangeClosed(51, 75), coll); @@ -579,6 +618,72 @@ void testCopyingExistingWithARestartMidwayThrough() { } } + @Test + @DisplayName("Copy existing with a restart after finishing") + void testCopyingExistingWithARestartAfterFinishing() { + assumeTrue(isGreaterThanThreeDotSix()); + try (AutoCloseableSourceTask task = createSourceTask()) { + + MongoCollection coll = getCollection(); + + HashMap cfg = + new HashMap() { + { + put(MongoSourceConfig.DATABASE_CONFIG, coll.getNamespace().getDatabaseName()); + put(MongoSourceConfig.COLLECTION_CONFIG, coll.getNamespace().getCollectionName()); + put(MongoSourceConfig.STARTUP_MODE_CONFIG, StartupMode.COPY_EXISTING.propertyValue()); + put(MongoSourceConfig.POLL_MAX_BATCH_SIZE_CONFIG, "25"); + put(MongoSourceConfig.POLL_AWAIT_TIME_MS_CONFIG, "1000"); + } + }; + + insertMany(rangeClosed(1, 50), coll); + task.start(cfg); + + List firstPoll = getNextBatch(task); + assertSourceRecordValues(createInserts(1, 25), firstPoll, coll); + assertTrue( + firstPoll.stream().map(SourceRecord::sourceOffset).allMatch(i -> i.containsKey("copy"))); + + List secondPoll = getNextBatch(task); + assertSourceRecordValues(createInserts(26, 50), secondPoll, coll); + // Make sure all elements, except the last one, contains the "copy" key + assertTrue( + secondPoll.stream() + .map(SourceRecord::sourceOffset) + .limit(25 - 1) // exclude the last record in the batch + .allMatch(i -> i.containsKey("copy"))); + + Map lastOffset = secondPoll.get(25 - 1).sourceOffset(); + + // Make sure the last copied element does not contain the "copy" key + assertFalse(lastOffset.containsKey("copy")); + + // mock the context so that on restart we know where the last task left off + when(context.offsetStorageReader()).thenReturn(offsetStorageReader); + assertNotNull(lastOffset.get("_id")); // check to make sure the value is an Object + @SuppressWarnings("unchecked") + Map mockedOffset = (Map) lastOffset; + when(offsetStorageReader.offset(any())).thenReturn(mockedOffset); + task.initialize(context); + + // perform a restart + task.stop(); + task.start(cfg); + + // make sure that a copy doesn't occur again because all data was already copied + assertTrue(getNextBatch(task).isEmpty()); + + // make sure that we can continue to process data + insertMany(rangeClosed(51, 75), coll); + + List thirdPoll = getNextBatch(task); + assertSourceRecordValues(createInserts(51, 75), thirdPoll, coll); + assertFalse( + thirdPoll.stream().map(SourceRecord::sourceOffset).anyMatch(i -> i.containsKey("copy"))); + } + } + @Test @DisplayName("Ensure source loads data from collection and outputs documents only") void testSourceLoadsDataFromCollectionDocumentOnly() { diff --git a/src/main/java/com/mongodb/kafka/connect/source/StartedMongoSourceTask.java b/src/main/java/com/mongodb/kafka/connect/source/StartedMongoSourceTask.java index 5eb4edabe..ad1f4e4cb 100644 --- a/src/main/java/com/mongodb/kafka/connect/source/StartedMongoSourceTask.java +++ b/src/main/java/com/mongodb/kafka/connect/source/StartedMongoSourceTask.java @@ -48,6 +48,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; @@ -210,68 +211,81 @@ private List pollInternal() { createValueSchemaAndValueProvider(sourceConfig); List sourceRecords = new ArrayList<>(); - getNextBatch() - .forEach( - changeStreamDocument -> { - Map sourceOffset = new HashMap<>(); - sourceOffset.put(ID_FIELD, changeStreamDocument.getDocument(ID_FIELD).toJson()); - if (isCopying) { - sourceOffset.put(COPY_KEY, "true"); - } + Iterator batchIterator = getNextBatch().iterator(); + while (batchIterator.hasNext()) { + BsonDocument changeStreamDocument = batchIterator.next(); + Map sourceOffset = new HashMap<>(); + sourceOffset.put(ID_FIELD, changeStreamDocument.getDocument(ID_FIELD).toJson()); + if (isCopying) { + sourceOffset.put(COPY_KEY, "true"); + } - String topicName = topicMapper.getTopic(changeStreamDocument); - if (topicName.isEmpty()) { - LOGGER.warn( - "No topic set. Could not publish the message: {}", - changeStreamDocument.toJson()); - } else { - - Optional valueDocument = Optional.empty(); - - boolean isTombstoneEvent = - publishFullDocumentOnlyTombstoneOnDelete - && !changeStreamDocument.containsKey(FULL_DOCUMENT); - if (publishFullDocumentOnly) { - if (changeStreamDocument.containsKey(FULL_DOCUMENT) - && changeStreamDocument.get(FULL_DOCUMENT).isDocument()) { - valueDocument = Optional.of(changeStreamDocument.getDocument(FULL_DOCUMENT)); - } - } else { - valueDocument = Optional.of(changeStreamDocument); - } - - if (valueDocument.isPresent() || isTombstoneEvent) { - BsonDocument valueDoc = valueDocument.orElse(new BsonDocument()); - LOGGER.trace("Adding {} to {}: {}", valueDoc, topicName, sourceOffset); - - if (valueDoc instanceof RawBsonDocument) { - int sizeBytes = ((RawBsonDocument) valueDoc).getByteBuffer().limit(); - statisticsManager.currentStatistics().getMongodbBytesRead().sample(sizeBytes); - } - - BsonDocument keyDocument; - if (sourceConfig.getKeyOutputFormat() == MongoSourceConfig.OutputFormat.SCHEMA) { - keyDocument = changeStreamDocument; - } else if (sourceConfig.getBoolean(DOCUMENT_KEY_AS_KEY_CONFIG) - && changeStreamDocument.containsKey(DOCUMENT_KEY_FIELD)) { - keyDocument = changeStreamDocument.getDocument(DOCUMENT_KEY_FIELD); - } else { - keyDocument = new BsonDocument(ID_FIELD, changeStreamDocument.get(ID_FIELD)); - } - - createSourceRecord( - keySchemaAndValueProducer, - isTombstoneEvent - ? TOMBSTONE_SCHEMA_AND_VALUE_PRODUCER - : valueSchemaAndValueProducer, - sourceOffset, - topicName, - keyDocument, - valueDoc) - .map(sourceRecords::add); - } - } - }); + // if isCopying is true, we want to set the COPY_KEY flag so that kafka has context that a + // copy is in progress. However, for the last document that we are copying, we should not set + // this flag because the copy has completed, otherwise we are relying on future change stream + // events to signify that we are no longer copying. We also need to set the _id field to be a + // valid resume token, which during copying exists in the cachedResumeToken variable. + // In version 3.6 of mongodb the cachedResumeToken initializes to null so we need to avoid + // this null pointer exception. + boolean lastDocument = !batchIterator.hasNext(); + boolean noMoreDataToCopy = copyDataManager != null && !copyDataManager.isCopying(); + if (isCopying && lastDocument && noMoreDataToCopy && cachedResumeToken != null) { + sourceOffset.put(ID_FIELD, cachedResumeToken.toJson()); + sourceOffset.remove(COPY_KEY); + } + + String topicName = topicMapper.getTopic(changeStreamDocument); + if (topicName.isEmpty()) { + LOGGER.warn( + "No topic set. Could not publish the message: {}", changeStreamDocument.toJson()); + } else { + + Optional valueDocument = Optional.empty(); + + boolean isTombstoneEvent = + publishFullDocumentOnlyTombstoneOnDelete + && !changeStreamDocument.containsKey(FULL_DOCUMENT); + if (publishFullDocumentOnly) { + if (changeStreamDocument.containsKey(FULL_DOCUMENT) + && changeStreamDocument.get(FULL_DOCUMENT).isDocument()) { + valueDocument = Optional.of(changeStreamDocument.getDocument(FULL_DOCUMENT)); + } + } else { + valueDocument = Optional.of(changeStreamDocument); + } + + if (valueDocument.isPresent() || isTombstoneEvent) { + BsonDocument valueDoc = valueDocument.orElse(new BsonDocument()); + LOGGER.trace("Adding {} to {}: {}", valueDoc, topicName, sourceOffset); + + if (valueDoc instanceof RawBsonDocument) { + int sizeBytes = ((RawBsonDocument) valueDoc).getByteBuffer().limit(); + statisticsManager.currentStatistics().getMongodbBytesRead().sample(sizeBytes); + } + + BsonDocument keyDocument; + if (sourceConfig.getKeyOutputFormat() == MongoSourceConfig.OutputFormat.SCHEMA) { + keyDocument = changeStreamDocument; + } else if (sourceConfig.getBoolean(DOCUMENT_KEY_AS_KEY_CONFIG) + && changeStreamDocument.containsKey(DOCUMENT_KEY_FIELD)) { + keyDocument = changeStreamDocument.getDocument(DOCUMENT_KEY_FIELD); + } else { + keyDocument = new BsonDocument(ID_FIELD, changeStreamDocument.get(ID_FIELD)); + } + + createSourceRecord( + keySchemaAndValueProducer, + isTombstoneEvent + ? TOMBSTONE_SCHEMA_AND_VALUE_PRODUCER + : valueSchemaAndValueProducer, + sourceOffset, + topicName, + keyDocument, + valueDoc) + .map(sourceRecords::add); + } + } + } LOGGER.debug("Return batch of {}", sourceRecords.size()); if (sourceRecords.isEmpty()) {