diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java index 48f7b3dfad1..8488003fce9 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java @@ -2828,8 +2828,13 @@ boolean shouldSyncOffsetFromSnapshot(DefaultPubSubMessage consumerRecord, Partit if (payloadUnion instanceof Put && ((Put) payloadUnion).getSchemaId() != CHUNK_SCHEMA_ID) { return true; // Global RT DIV message can be multiple chunks + deletes, only sync on one Put (manifest or value) } + } else if (isNonSegmentControlMessage(consumerRecord, null)) { + return true; // sync when processing most control messages } - return isNonSegmentControlMessage(consumerRecord, null); + + // must be greater than the interval in shouldSendGlobalRtDiv() to not interfere + final long syncBytesInterval = getSyncBytesInterval(pcs); // size-based sync condition + return syncBytesInterval > 0 && (pcs.getProcessedRecordSizeSinceLastSync() >= 2 * syncBytesInterval); } /** diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 28c099a72ec..4659dca340d 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -1934,6 +1934,10 @@ protected void updateOffsetMetadataAndSyncOffset(@Nonnull PartitionConsumptionSt } protected void updateOffsetMetadataAndSyncOffset(DataIntegrityValidator div, @Nonnull PartitionConsumptionState pcs) { + if (isGlobalRtDivEnabled()) { + LOGGER.info("Skipping updateOffsetMetadataAndSyncOffset() because Global RT DIV is enabled."); + return; + } /** * Offset metadata and producer states must be updated at the same time in OffsetRecord; otherwise, one checkpoint * could be ahead of the other. @@ -2918,7 +2922,9 @@ private void syncOffset(PartitionConsumptionState pcs) { String msg = "Offset synced for replica: " + pcs.getReplicaId() + " - localVtPosition: {} progress: {}"; if (!REDUNDANT_LOGGING_FILTER.isRedundantException(msg)) { - final PubSubPosition position = offsetRecord.getCheckpointedLocalVtPosition(); + final PubSubPosition position = (isGlobalRtDivEnabled()) + ? offsetRecord.getLatestConsumedVtPosition() + : offsetRecord.getCheckpointedLocalVtPosition(); int percentage = -1; if (getServerConfig().isIngestionProgressLoggingEnabled()) { final PubSubTopicPartition topicPartition = pcs.getReplicaTopicPartition(); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/validation/DataIntegrityValidator.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/validation/DataIntegrityValidator.java index 2064213c5fe..7220494acbd 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/validation/DataIntegrityValidator.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/validation/DataIntegrityValidator.java @@ -12,6 +12,7 @@ import com.linkedin.venice.pubsub.api.DefaultPubSubMessage; import com.linkedin.venice.pubsub.api.PubSubPosition; import com.linkedin.venice.utils.SparseConcurrentList; +import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import com.linkedin.venice.utils.lazy.Lazy; import java.util.HashSet; import java.util.Map; @@ -103,7 +104,7 @@ public void setPartitionState( PartitionTracker.TopicType type, int partition, Map producerPartitionStateMap) { - registerPartition(partition).setPartitionState(type, producerPartitionStateMap, DISABLED); + registerPartition(partition).setPartitionState(type, producerPartitionStateMap, this.maxAgeInMs); } /** @@ -139,7 +140,7 @@ public void updateOffsetRecordForPartition( public void cloneVtProducerStates(int partition, DataIntegrityValidator newValidator) { PartitionTracker destPartitionTracker = newValidator.registerPartition(partition); - registerPartition(partition).cloneVtProducerStates(destPartitionTracker); + registerPartition(partition).cloneVtProducerStates(destPartitionTracker, maxAgeInMs); } /** @@ -147,8 +148,8 @@ public void cloneVtProducerStates(int partition, DataIntegrityValidator newValid */ public PartitionTracker cloneRtProducerStates(int partition, String brokerUrl) { PartitionTracker clonedPartitionTracker = partitionTrackerCreator.apply(partition); - final PartitionTracker existingPartitionTracker = registerPartition(partition); - existingPartitionTracker.cloneRtProducerStates(clonedPartitionTracker, brokerUrl); // for a single broker + final PartitionTracker oldPartitionTracker = registerPartition(partition); + oldPartitionTracker.cloneRtProducerStates(clonedPartitionTracker, brokerUrl, maxAgeInMs); // for a single broker return clonedPartitionTracker; } @@ -157,8 +158,8 @@ public PartitionTracker cloneRtProducerStates(int partition, String brokerUrl) { */ public PartitionTracker cloneVtProducerStates(int partition) { PartitionTracker clonedPartitionTracker = partitionTrackerCreator.apply(partition); - final PartitionTracker existingPartitionTracker = registerPartition(partition); - existingPartitionTracker.cloneVtProducerStates(clonedPartitionTracker); // for a single broker + final PartitionTracker oldPartitionTracker = registerPartition(partition); + oldPartitionTracker.cloneVtProducerStates(clonedPartitionTracker, maxAgeInMs); // for a single broker return clonedPartitionTracker; } @@ -215,7 +216,7 @@ public boolean hasGlobalRtDivState(int partition) { LOGGER.info("PartitionTracker is null for partition: {}", partition); return false; } - Map> rtSegments = partitionTracker.getAllRtSegmentsForTesting(); + Map> rtSegments = partitionTracker.getRtSegmentsForTesting(); for (Map segments: rtSegments.values()) { if (!segments.isEmpty()) { LOGGER.info("RT DIV state size: {}", segments.size()); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/validation/PartitionTracker.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/validation/PartitionTracker.java index 7b568892466..c4ade13f8fe 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/validation/PartitionTracker.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/validation/PartitionTracker.java @@ -153,15 +153,15 @@ Segment getSegment(TopicType type, GUID guid) { } public void setPartitionState(TopicType type, OffsetRecord offsetRecord, long maxAgeInMs) { - long minimumRequiredRecordProducerTimestamp = - maxAgeInMs == DISABLED ? DISABLED : offsetRecord.getMaxMessageTimeInMs() - maxAgeInMs; - setPartitionState(type, offsetRecord.getProducerPartitionStateMap(), minimumRequiredRecordProducerTimestamp); + long earliestAllowableTimestamp = + maxAgeInMs == DISABLED ? DISABLED : offsetRecord.calculateLatestMessageTimeInMs() - maxAgeInMs; + setPartitionState(type, offsetRecord.getProducerPartitionStateMap(), earliestAllowableTimestamp); } public void setPartitionState( TopicType type, Map producerPartitionStateMap, - long minimumRequiredRecordProducerTimestamp) { + long earliestAllowableTimestamp) { Iterator> iterator = producerPartitionStateMap.entrySet().iterator(); Map.Entry entry; @@ -171,7 +171,7 @@ public void setPartitionState( entry = iterator.next(); producerGuid = GuidUtils.getGuidFromCharSequence(entry.getKey()); producerPartitionState = entry.getValue(); - if (producerPartitionState.messageTimestamp >= minimumRequiredRecordProducerTimestamp) { + if (producerPartitionState.messageTimestamp >= earliestAllowableTimestamp) { /** * This {@link producerPartitionState} is eligible to be retained, so we'll set the state in the * {@link PartitionTracker}. @@ -210,9 +210,14 @@ private void setSegment(TopicType type, GUID guid, Segment segment) { /** * Clone the vtSegments and LCVP to the destination PartitionTracker. May be called concurrently. */ - public void cloneVtProducerStates(PartitionTracker destProducerTracker) { + public void cloneVtProducerStates(PartitionTracker destProducerTracker, long maxAgeInMs) { + long earliestAllowableTimestamp = maxAgeInMs == DISABLED ? DISABLED : new Date().getTime() - maxAgeInMs; for (Map.Entry entry: vtSegments.entrySet()) { - destProducerTracker.setSegment(PartitionTracker.VERSION_TOPIC, entry.getKey(), new Segment(entry.getValue())); + if (entry.getValue().getLastRecordTimestamp() >= earliestAllowableTimestamp) { + destProducerTracker.setSegment(PartitionTracker.VERSION_TOPIC, entry.getKey(), new Segment(entry.getValue())); + } else { + vtSegments.remove(entry.getKey()); // The state is eligible to be cleared. + } } destProducerTracker.updateLatestConsumedVtPosition(latestConsumedVtPosition.get()); } @@ -220,16 +225,24 @@ public void cloneVtProducerStates(PartitionTracker destProducerTracker) { /** * Clone the rtSegments to the destination PartitionTracker. Filter by brokerUrl. May be called concurrently. */ - public void cloneRtProducerStates(PartitionTracker destProducerTracker, String brokerUrl) { - for (Map.Entry> entry: rtSegments.entrySet()) { - if (!brokerUrl.isEmpty() && !brokerUrl.equals(entry.getKey())) { - continue; // filter by brokerUrl if specified + public void cloneRtProducerStates(PartitionTracker destProducerTracker, String brokerUrl, long maxAgeInMs) { + long earliestAllowableTimestamp = maxAgeInMs == DISABLED ? DISABLED : new Date().getTime() - maxAgeInMs; + for (Map.Entry> broker2Segment: rtSegments.entrySet()) { + if (!brokerUrl.equals(broker2Segment.getKey())) { + continue; // filter by the specified brokerUrl } - for (Map.Entry rtEntry: entry.getValue().entrySet()) { - destProducerTracker.setSegment( - TopicType.of(TopicType.REALTIME_TOPIC_TYPE, entry.getKey()), - rtEntry.getKey(), - new Segment(rtEntry.getValue())); + + final VeniceConcurrentHashMap rtEntries = broker2Segment.getValue(); + for (Map.Entry rtEntry: rtEntries.entrySet()) { + if (rtEntry.getValue().getLastRecordTimestamp() >= earliestAllowableTimestamp) { + TopicType realTimeTopicType = TopicType.of(TopicType.REALTIME_TOPIC_TYPE, broker2Segment.getKey()); + destProducerTracker.setSegment(realTimeTopicType, rtEntry.getKey(), new Segment(rtEntry.getValue())); + } else { + rtEntries.remove(rtEntry.getKey()); // The state is eligible to be cleared. + } + } + if (broker2Segment.getValue().isEmpty()) { + rtSegments.remove(broker2Segment.getKey()); } } } @@ -711,7 +724,7 @@ public void checkMissingMessage( } void clearExpiredStateAndUpdateOffsetRecord(TopicType type, OffsetRecord offsetRecord, long maxAgeInMs) { - long minimumRequiredRecordProducerTimestamp = offsetRecord.getMaxMessageTimeInMs() - maxAgeInMs; + long minimumRequiredRecordProducerTimestamp = offsetRecord.calculateLatestMessageTimeInMs() - maxAgeInMs; int numberOfClearedGUIDs = 0; Iterator> iterator = getSegments(type).entrySet().iterator(); Map.Entry entry; @@ -747,14 +760,15 @@ public void removeProducerState(TopicType type, GUID guid, OffsetRecord offsetRe } @VisibleForTesting - Map> getAllRtSegmentsForTesting() { - return rtSegments.entrySet() - .stream() - .collect(Collectors.toMap(Map.Entry::getKey, entry -> Collections.unmodifiableMap(entry.getValue()))); + VeniceConcurrentHashMap> getRtSegmentsForTesting() { + return rtSegments; + // return rtSegments.entrySet() + // .stream() + // .collect(Collectors.toMap(Map.Entry::getKey, entry -> Collections.unmodifiableMap(entry.getValue()))); } @VisibleForTesting - Map getVtSegmentsForTesting() { + VeniceConcurrentHashMap getVtSegmentsForTesting() { return vtSegments; } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java index 1b5ab1aebc2..8286fd4e467 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java @@ -737,6 +737,31 @@ public void testShouldSyncOffsetFromSnapshot() throws InterruptedException { doReturn(ControlMessageType.START_OF_SEGMENT.getValue()).when(mockControlMessage).getControlMessageType(); assertFalse( mockIngestionTask.shouldSyncOffsetFromSnapshot(nonSegmentControlMessage, mockPartitionConsumptionState)); + + // Mock the getSyncBytesInterval method to return a specific value + doReturn(1000L).when(mockIngestionTask).getSyncBytesInterval(any()); + + // Create a mock message that is neither a Global RT DIV nor a control message + final DefaultPubSubMessage regularMessage = getMockMessage(3).getMessage(); + KafkaKey regularMockKey = regularMessage.getKey(); + doReturn(false).when(regularMockKey).isGlobalRtDiv(); + doReturn(false).when(regularMockKey).isControlMessage(); + + // Test case 1: When processedRecordSizeSinceLastSync is less than 2*syncBytesInterval + doReturn(1500L).when(mockPartitionConsumptionState).getProcessedRecordSizeSinceLastSync(); + assertFalse(mockIngestionTask.shouldSyncOffsetFromSnapshot(regularMessage, mockPartitionConsumptionState)); + + // Test case 2: When processedRecordSizeSinceLastSync is equal to 2*syncBytesInterval + doReturn(2000L).when(mockPartitionConsumptionState).getProcessedRecordSizeSinceLastSync(); + assertTrue(mockIngestionTask.shouldSyncOffsetFromSnapshot(regularMessage, mockPartitionConsumptionState)); + + // Test case 3: When processedRecordSizeSinceLastSync is greater than 2*syncBytesInterval + doReturn(2500L).when(mockPartitionConsumptionState).getProcessedRecordSizeSinceLastSync(); + assertTrue(mockIngestionTask.shouldSyncOffsetFromSnapshot(regularMessage, mockPartitionConsumptionState)); + + // Test case 4: When syncBytesInterval is 0 (disabled) + doReturn(0L).when(mockIngestionTask).getSyncBytesInterval(any()); + assertFalse(mockIngestionTask.shouldSyncOffsetFromSnapshot(regularMessage, mockPartitionConsumptionState)); } @Test diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/validation/DataIntegrityValidatorTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/validation/DataIntegrityValidatorTest.java index 478a1b84d2c..5d6e677b5b6 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/validation/DataIntegrityValidatorTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/validation/DataIntegrityValidatorTest.java @@ -378,8 +378,8 @@ private static DefaultPubSubMessage buildRecord( messageEnvelope.leaderMetadataFooter.upstreamPubSubPosition = PubSubSymbolicPosition.EARLIEST.toWireFormatBuffer(); messageEnvelope.payloadUnion = payload; - if (offsetRecord != null && offsetRecord.getMaxMessageTimeInMs() < brokerTimestamp) { - when(offsetRecord.getMaxMessageTimeInMs()).thenReturn(brokerTimestamp); + if (offsetRecord != null && offsetRecord.calculateLatestMessageTimeInMs() < brokerTimestamp) { + when(offsetRecord.calculateLatestMessageTimeInMs()).thenReturn(brokerTimestamp); } return new ImmutablePubSubMessage( diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/validation/TestPartitionTracker.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/validation/TestPartitionTracker.java index 5738551d2b4..e538bda8013 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/validation/TestPartitionTracker.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/validation/TestPartitionTracker.java @@ -6,6 +6,7 @@ import static com.linkedin.venice.kafka.validation.checksum.CheckSumType.MD5; import static com.linkedin.venice.kafka.validation.checksum.CheckSumType.NONE; import static com.linkedin.venice.utils.TestUtils.DEFAULT_PUBSUB_CONTEXT_FOR_UNIT_TESTING; +import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertSame; import static org.testng.Assert.assertThrows; @@ -39,18 +40,24 @@ import com.linkedin.venice.pubsub.PubSubTopicRepository; import com.linkedin.venice.pubsub.adapter.kafka.common.ApacheKafkaOffsetPosition; import com.linkedin.venice.pubsub.api.DefaultPubSubMessage; +import com.linkedin.venice.pubsub.api.PubSubPosition; import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.pubsub.api.PubSubTopicPartition; import com.linkedin.venice.pubsub.mock.InMemoryPubSubPosition; +import com.linkedin.venice.pubsub.mock.InMemoryPubSubPositionFactory; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.VeniceProperties; +import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import com.linkedin.venice.utils.lazy.Lazy; import com.linkedin.venice.writer.VeniceWriter; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import org.apache.avro.specific.FixedSize; import org.apache.logging.log4j.LogManager; @@ -70,7 +77,10 @@ public class TestPartitionTracker { private PubSubTopic realTimeTopic; private PubSubTopic versionTopic; int partitionId = 0; + public static final long MAX_AGE_IN_MS = 5000; // 5 seconds private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository(); + private final PubSubPositionDeserializer positionDeserializer = + new PubSubPositionDeserializer(InMemoryPubSubPositionFactory.getPositionTypeRegistryWithInMemoryPosition()); @BeforeMethod(alwaysRun = true) public void methodSetUp() { @@ -326,13 +336,10 @@ public void testSegmentNumber(TopicType type) { Segment secondSegment = new Segment(partitionId, skipSegmentNumber, CheckSumType.NONE); long offset = 10; - // Send the first segment + // Send the first segment with sequence number 0 ControlMessage startOfSegment = getStartOfSegment(); KafkaMessageEnvelope startOfSegmentMessage = - getKafkaMessageEnvelope(MessageType.CONTROL_MESSAGE, guid, firstSegment, Optional.empty(), startOfSegment); // sequence - // number - // is - // zero + getKafkaMessageEnvelope(MessageType.CONTROL_MESSAGE, guid, firstSegment, Optional.empty(), startOfSegment); DefaultPubSubMessage controlMessageConsumerRecord = new ImmutablePubSubMessage( getControlMessageKey(startOfSegmentMessage), startOfSegmentMessage, @@ -381,13 +388,10 @@ public void testDuplicateMsgsDetected(TopicType type, CheckSumType checkSumType) Segment firstSegment = new Segment(partitionId, 0, checkSumType); long offset = 10; - // Send SOS + // Send SOS with sequence number 0 ControlMessage startOfSegment = getStartOfSegment(); KafkaMessageEnvelope startOfSegmentMessage = - getKafkaMessageEnvelope(MessageType.CONTROL_MESSAGE, guid, firstSegment, Optional.empty(), startOfSegment); // sequence - // number - // is - // 0 + getKafkaMessageEnvelope(MessageType.CONTROL_MESSAGE, guid, firstSegment, Optional.empty(), startOfSegment); DefaultPubSubMessage controlMessageConsumerRecord = new ImmutablePubSubMessage( getControlMessageKey(startOfSegmentMessage), startOfSegmentMessage, @@ -398,12 +402,10 @@ public void testDuplicateMsgsDetected(TopicType type, CheckSumType checkSumType) partitionTracker.validateMessage(type, controlMessageConsumerRecord, true, Lazy.FALSE); Assert.assertEquals(partitionTracker.getSegment(type, guid).getSequenceNumber(), 0); - // send EOS + // Send EOS with sequence number 5 ControlMessage endOfSegment = getEndOfSegment(ByteBuffer.allocate(0)); KafkaMessageEnvelope endOfSegmentMessage = - getKafkaMessageEnvelope(MessageType.CONTROL_MESSAGE, guid, firstSegment, Optional.of(5), endOfSegment); // sequence - // number - // is 5 + getKafkaMessageEnvelope(MessageType.CONTROL_MESSAGE, guid, firstSegment, Optional.of(5), endOfSegment); controlMessageConsumerRecord = new ImmutablePubSubMessage( getControlMessageKey(endOfSegmentMessage), endOfSegmentMessage, @@ -715,4 +717,110 @@ private List getSegmentOfMessages( return messages; } + + /** + * Test that cloneVtProducerStates removes old entries from the source tracker when maxAgeInMs threshold is exceeded. + */ + @Test(timeOut = 10 * Time.MS_PER_SECOND) + public void testCloneVtProducerStates() throws InterruptedException { + final int NUM_PRODUCERS = 3; + String destTopicNamePrefix = "dest_topic_" + System.currentTimeMillis(); + PubSubTopic destVersionTopic = pubSubTopicRepository.getTopic(destTopicNamePrefix + "_v1"); + PartitionTracker destTracker = new PartitionTracker(destVersionTopic.getName(), partitionId, positionDeserializer); + + // Create producer GUIDs, populate the source tracker's vt segments, and the latest consumed vt position + List guids = createGuids(NUM_PRODUCERS); + List oldGuids = Collections.singletonList(guids.get(0)); + VeniceConcurrentHashMap srcVtSegments = partitionTracker.getVtSegmentsForTesting(); + createTestSegments(System.currentTimeMillis(), srcVtSegments, guids); + Map destVtSegments = destTracker.getVtSegmentsForTesting(); + List> allSegments = Arrays.asList(srcVtSegments, destVtSegments); + PubSubPosition testPosition = ApacheKafkaOffsetPosition.of(12345L); + partitionTracker.updateLatestConsumedVtPosition(testPosition); + + // Clone with DISABLED maxAgeInMs. Verify that all segments were cloned, even the old segments. + partitionTracker.cloneVtProducerStates(destTracker, DataIntegrityValidator.DISABLED); + allSegments.forEach(segments -> assertEquals(segments.size(), 3, "All segments should be present")); + guids.forEach(guid -> assertTrue(srcVtSegments.containsKey(guid) && destVtSegments.containsKey(guid))); + assertEquals(destTracker.getLatestConsumedVtPosition(), testPosition, "latestConsumedVtPosition should be copied"); + + // Clone producer states using maxAgeInMs. + // Old segments should be removed from the source tracker and not be cloned to the dest tracker . + destVtSegments.clear(); + partitionTracker.cloneVtProducerStates(destTracker, MAX_AGE_IN_MS); + allSegments.forEach(segments -> { + assertEquals(segments.size(), 2, "Segment 1 (very old) should've been removed so size goes 3->2"); + guids.forEach(guid -> assertTrue(oldGuids.contains(guid) ^ segments.containsKey(guid))); + }); + assertEquals(destTracker.getLatestConsumedVtPosition(), testPosition, "latestConsumedVtPosition should be copied"); + } + + /** + * Test that cloneRtProducerStates removes old entries from the source tracker when maxAgeInMs threshold is exceeded. + */ + @Test(timeOut = 10 * Time.MS_PER_SECOND) + public void testCloneRtProducerStates() { + final int NUM_PRODUCERS = 3; + String destTopicNamePrefix = "dest_topic_" + System.currentTimeMillis(); + PubSubTopic destVersionTopic = pubSubTopicRepository.getTopic(destTopicNamePrefix + "_v1"); + PartitionTracker destTracker = new PartitionTracker(destVersionTopic.getName(), partitionId, positionDeserializer); + String brokerUrl = "testBrokerUrl"; + String expiredBrokerUrl = "expiredBrokerUrl"; + + List guids = createGuids(NUM_PRODUCERS); + List oldGuids = Collections.singletonList(guids.get(0)); + Map> allSrcRtSegments = partitionTracker.getRtSegmentsForTesting(); + VeniceConcurrentHashMap srcRtSegments = new VeniceConcurrentHashMap<>(); + allSrcRtSegments.computeIfAbsent(brokerUrl, k -> srcRtSegments); + + createTestSegments(System.currentTimeMillis(), srcRtSegments, guids); + Segment oldSegment = srcRtSegments.get(guids.get(0)); + allSrcRtSegments.computeIfAbsent(expiredBrokerUrl, k -> new VeniceConcurrentHashMap<>()); + guids.forEach(guid -> allSrcRtSegments.get(expiredBrokerUrl).put(guid, oldSegment)); + + // Clone the producer states for the expired broker. They should be removed on the source and not cloned to dest. + partitionTracker.cloneRtProducerStates(destTracker, expiredBrokerUrl, MAX_AGE_IN_MS); + Map> allDestRtSegments = destTracker.getRtSegmentsForTesting(); + Arrays.asList(allSrcRtSegments, allDestRtSegments).forEach(rtSegments -> { + assertFalse(rtSegments.containsKey(expiredBrokerUrl), "The expired broker entry should've been removed"); + }); + + // Clone the producer states for the primary broker. Only recent and borderline segments should be cloned. + partitionTracker.cloneRtProducerStates(destTracker, brokerUrl, MAX_AGE_IN_MS); + Arrays.asList(allSrcRtSegments, allDestRtSegments).forEach(rtSegments -> { + assertEquals(rtSegments.size(), 1, "Only one broker URL should be remaining"); + assertTrue(rtSegments.containsKey(brokerUrl), "The broker URL should've been cloned"); + + VeniceConcurrentHashMap segments = rtSegments.get(brokerUrl); + assertEquals(segments.size(), 2, "Segment 1 (very old) should've been removed so size goes 3->2"); + guids.forEach(guid -> Assert.assertTrue(oldGuids.contains(guid) ^ segments.containsKey(guid))); + }); + } + + private List createGuids(int numGuids) { + List guids = new ArrayList<>(); + for (int i = 0; i < numGuids; i++) { + guids.add(GuidUtils.getGUID(VeniceProperties.empty())); + } + return guids; + } + + private void createTestSegments(long t, VeniceConcurrentHashMap segments, List guids) { + assertTrue(guids.size() >= 3, "Need at least 3 GUIDs to create test segments"); + + // Segment 1: Very old (should be removed) + Segment oldSegment = new Segment(partitionId, 0, CheckSumType.NONE); + oldSegment.setLastRecordTimestamp(t - MAX_AGE_IN_MS - 2000); // Expired by 2 seconds + segments.put(guids.get(0), oldSegment); + + // Segment 2: Recent (should be retained) + Segment recentSegment = new Segment(partitionId, 1, CheckSumType.NONE); + recentSegment.setLastRecordTimestamp(t - 1000); // Only 1 second old + segments.put(guids.get(1), recentSegment); + + // Segment 3: Borderline recent (should be retained) + Segment borderlineSegment = new Segment(partitionId, 2, CheckSumType.NONE); + borderlineSegment.setLastRecordTimestamp(t - MAX_AGE_IN_MS + 1000); // Just within threshold + segments.put(guids.get(2), borderlineSegment); + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/offsets/OffsetRecord.java b/internal/venice-common/src/main/java/com/linkedin/venice/offsets/OffsetRecord.java index 7c7b22d34dd..2658c1ff5fd 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/offsets/OffsetRecord.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/offsets/OffsetRecord.java @@ -1,9 +1,8 @@ package com.linkedin.venice.offsets; -import static com.linkedin.venice.guid.GuidUtils.guidToUtf8; - import com.linkedin.venice.annotation.VisibleForTesting; import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.guid.GuidUtils; import com.linkedin.venice.kafka.protocol.GUID; import com.linkedin.venice.kafka.protocol.state.IncrementalPushReplicaStatus; import com.linkedin.venice.kafka.protocol.state.PartitionState; @@ -180,12 +179,12 @@ public void setLastCheckpointTimestamp(long timestamp) { /** * @return the last messageTimeStamp across all producers tracked by this OffsetRecord */ - public long getMaxMessageTimeInMs() { - long maxMessageTimestamp = -1; - for (ProducerPartitionState state: this.partitionState.producerStates.values()) { - maxMessageTimestamp = Math.max(maxMessageTimestamp, state.messageTimestamp); - } - return maxMessageTimestamp; + public long calculateLatestMessageTimeInMs() { + return this.partitionState.producerStates.values() + .stream() + .mapToLong(ProducerPartitionState::getMessageTimestamp) + .max() + .orElse(-1); } public long getLatestProducerProcessingTimeInMs() { @@ -205,11 +204,11 @@ public boolean isEndOfPushReceived() { } public synchronized void setProducerPartitionState(GUID producerGuid, ProducerPartitionState state) { - this.partitionState.producerStates.put(guidToUtf8(producerGuid), state); + this.partitionState.producerStates.put(GuidUtils.guidToUtf8(producerGuid), state); } public synchronized void removeProducerPartitionState(GUID producerGuid) { - this.partitionState.producerStates.remove(guidToUtf8(producerGuid)); + this.partitionState.producerStates.remove(GuidUtils.guidToUtf8(producerGuid)); } public synchronized Map getProducerPartitionStateMap() { @@ -222,14 +221,14 @@ public synchronized void setRealtimeTopicProducerState( ProducerPartitionState state) { partitionState.getRealtimeTopicProducerStates() .computeIfAbsent(kafkaUrl, url -> new VeniceConcurrentHashMap<>()) - .put(guidToUtf8(producerGuid), state); + .put(GuidUtils.guidToUtf8(producerGuid), state); } public synchronized void removeRealTimeTopicProducerState(String kafkaUrl, GUID producerGuid) { if (partitionState.getRealtimeTopicProducerStates().get(kafkaUrl) == null) { return; } - partitionState.getRealtimeTopicProducerStates().get(kafkaUrl).remove(guidToUtf8(producerGuid)); + partitionState.getRealtimeTopicProducerStates().get(kafkaUrl).remove(GuidUtils.guidToUtf8(producerGuid)); } public synchronized ProducerPartitionState getRealTimeProducerState(String kafkaUrl, GUID producerGuid) { @@ -237,7 +236,7 @@ public synchronized ProducerPartitionState getRealTimeProducerState(String kafka if (map == null) { return null; } - return map.get(guidToUtf8(producerGuid)); + return map.get(GuidUtils.guidToUtf8(producerGuid)); } private Map> getRealTimeProducerState() { @@ -245,7 +244,7 @@ private Map> getRealTimeProduc } public synchronized ProducerPartitionState getProducerPartitionState(GUID producerGuid) { - return getProducerPartitionStateMap().get(guidToUtf8(producerGuid)); + return getProducerPartitionStateMap().get(GuidUtils.guidToUtf8(producerGuid)); } public void setDatabaseInfo(Map databaseInfo) { @@ -406,7 +405,7 @@ public void setTrackingIncrementalPushStatus( public String toString() { return "OffsetRecord{" + "localVtPosition=" + getCheckpointedLocalVtPosition() + ", remoteVtPosition=" + getCheckpointedRemoteVtPosition() + ", rtPositions=" + getPartitionUpstreamPositionString() + ", leaderTopic=" - + getLeaderTopic() + ", offsetLag=" + getOffsetLag() + ", eventTimeEpochMs=" + getMaxMessageTimeInMs() + + getLeaderTopic() + ", offsetLag=" + getOffsetLag() + ", eventTimeEpochMs=" + calculateLatestMessageTimeInMs() + ", latestProducerProcessingTimeInMs=" + getLatestProducerProcessingTimeInMs() + ", isEndOfPushReceived=" + isEndOfPushReceived() + ", databaseInfo=" + getDatabaseInfo() + ", realTimeProducerState=" + getRealTimeProducerState() + ", recordTransformerClassHash=" + getRecordTransformerClassHash()