Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2828,8 +2828,13 @@ boolean shouldSyncOffsetFromSnapshot(DefaultPubSubMessage consumerRecord, Partit
if (payloadUnion instanceof Put && ((Put) payloadUnion).getSchemaId() != CHUNK_SCHEMA_ID) {
return true; // Global RT DIV message can be multiple chunks + deletes, only sync on one Put (manifest or value)
}
} else if (isNonSegmentControlMessage(consumerRecord, null)) {
return true; // sync when processing most control messages
}
return isNonSegmentControlMessage(consumerRecord, null);

// must be greater than the interval in shouldSendGlobalRtDiv() to not interfere
final long syncBytesInterval = getSyncBytesInterval(pcs); // size-based sync condition
return syncBytesInterval > 0 && (pcs.getProcessedRecordSizeSinceLastSync() >= 2 * syncBytesInterval);
Copy link
Contributor

@lluwm lluwm Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I have a question about this implementation if we use the running sum of the record sizes from pcs here. There seems to be a problem:

  1. shouldSyncOffsetFromSnapshot is called in the consumer thread.
  2. pcs.processedRecordSizeSinceLastSync is cleared later in the drainer thread when syncing offset.
  3. we have a memory buffer between consumer and drainer.

So, I imagine what could happen is that once size-based condition is triggered for one record in consumer thread, it will keep firing continuously for every following record for quite some time, until the first one record got synced in drainer thread. Because of that, we could have unnecessarily triggered a lot more sync offset operations.

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1934,6 +1934,10 @@ protected void updateOffsetMetadataAndSyncOffset(@Nonnull PartitionConsumptionSt
}

protected void updateOffsetMetadataAndSyncOffset(DataIntegrityValidator div, @Nonnull PartitionConsumptionState pcs) {
if (isGlobalRtDivEnabled()) {
LOGGER.info("Skipping updateOffsetMetadataAndSyncOffset() because Global RT DIV is enabled.");
return;
}
/**
* Offset metadata and producer states must be updated at the same time in OffsetRecord; otherwise, one checkpoint
* could be ahead of the other.
Expand Down Expand Up @@ -2918,7 +2922,9 @@ private void syncOffset(PartitionConsumptionState pcs) {

String msg = "Offset synced for replica: " + pcs.getReplicaId() + " - localVtPosition: {} progress: {}";
if (!REDUNDANT_LOGGING_FILTER.isRedundantException(msg)) {
final PubSubPosition position = offsetRecord.getCheckpointedLocalVtPosition();
final PubSubPosition position = (isGlobalRtDivEnabled())
? offsetRecord.getLatestConsumedVtPosition()
: offsetRecord.getCheckpointedLocalVtPosition();
int percentage = -1;
if (getServerConfig().isIngestionProgressLoggingEnabled()) {
final PubSubTopicPartition topicPartition = pcs.getReplicaTopicPartition();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.linkedin.venice.pubsub.api.DefaultPubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubPosition;
import com.linkedin.venice.utils.SparseConcurrentList;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import com.linkedin.venice.utils.lazy.Lazy;
import java.util.HashSet;
import java.util.Map;
Expand Down Expand Up @@ -103,7 +104,7 @@ public void setPartitionState(
PartitionTracker.TopicType type,
int partition,
Map<CharSequence, ProducerPartitionState> producerPartitionStateMap) {
registerPartition(partition).setPartitionState(type, producerPartitionStateMap, DISABLED);
registerPartition(partition).setPartitionState(type, producerPartitionStateMap, this.maxAgeInMs);
}

/**
Expand Down Expand Up @@ -139,16 +140,16 @@ public void updateOffsetRecordForPartition(

public void cloneVtProducerStates(int partition, DataIntegrityValidator newValidator) {
PartitionTracker destPartitionTracker = newValidator.registerPartition(partition);
registerPartition(partition).cloneVtProducerStates(destPartitionTracker);
registerPartition(partition).cloneVtProducerStates(destPartitionTracker, maxAgeInMs);
}

/**
* Returns the RT DIV state for a given partition and broker URL pair.
*/
public PartitionTracker cloneRtProducerStates(int partition, String brokerUrl) {
PartitionTracker clonedPartitionTracker = partitionTrackerCreator.apply(partition);
final PartitionTracker existingPartitionTracker = registerPartition(partition);
existingPartitionTracker.cloneRtProducerStates(clonedPartitionTracker, brokerUrl); // for a single broker
final PartitionTracker oldPartitionTracker = registerPartition(partition);
oldPartitionTracker.cloneRtProducerStates(clonedPartitionTracker, brokerUrl, maxAgeInMs); // for a single broker
return clonedPartitionTracker;
}

Expand All @@ -157,8 +158,8 @@ public PartitionTracker cloneRtProducerStates(int partition, String brokerUrl) {
*/
public PartitionTracker cloneVtProducerStates(int partition) {
PartitionTracker clonedPartitionTracker = partitionTrackerCreator.apply(partition);
final PartitionTracker existingPartitionTracker = registerPartition(partition);
existingPartitionTracker.cloneVtProducerStates(clonedPartitionTracker); // for a single broker
final PartitionTracker oldPartitionTracker = registerPartition(partition);
oldPartitionTracker.cloneVtProducerStates(clonedPartitionTracker, maxAgeInMs); // for a single broker
return clonedPartitionTracker;
}

Expand Down Expand Up @@ -215,7 +216,7 @@ public boolean hasGlobalRtDivState(int partition) {
LOGGER.info("PartitionTracker is null for partition: {}", partition);
return false;
}
Map<String, Map<GUID, Segment>> rtSegments = partitionTracker.getAllRtSegmentsForTesting();
Map<String, VeniceConcurrentHashMap<GUID, Segment>> rtSegments = partitionTracker.getRtSegmentsForTesting();
for (Map<GUID, Segment> segments: rtSegments.values()) {
if (!segments.isEmpty()) {
LOGGER.info("RT DIV state size: {}", segments.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,15 @@ Segment getSegment(TopicType type, GUID guid) {
}

public void setPartitionState(TopicType type, OffsetRecord offsetRecord, long maxAgeInMs) {
long minimumRequiredRecordProducerTimestamp =
maxAgeInMs == DISABLED ? DISABLED : offsetRecord.getMaxMessageTimeInMs() - maxAgeInMs;
setPartitionState(type, offsetRecord.getProducerPartitionStateMap(), minimumRequiredRecordProducerTimestamp);
long earliestAllowableTimestamp =
maxAgeInMs == DISABLED ? DISABLED : offsetRecord.calculateLatestMessageTimeInMs() - maxAgeInMs;
setPartitionState(type, offsetRecord.getProducerPartitionStateMap(), earliestAllowableTimestamp);
}

public void setPartitionState(
TopicType type,
Map<CharSequence, ProducerPartitionState> producerPartitionStateMap,
long minimumRequiredRecordProducerTimestamp) {
long earliestAllowableTimestamp) {
Iterator<Map.Entry<CharSequence, ProducerPartitionState>> iterator =
producerPartitionStateMap.entrySet().iterator();
Map.Entry<CharSequence, ProducerPartitionState> entry;
Expand All @@ -171,7 +171,7 @@ public void setPartitionState(
entry = iterator.next();
producerGuid = GuidUtils.getGuidFromCharSequence(entry.getKey());
producerPartitionState = entry.getValue();
if (producerPartitionState.messageTimestamp >= minimumRequiredRecordProducerTimestamp) {
if (producerPartitionState.messageTimestamp >= earliestAllowableTimestamp) {
/**
* This {@link producerPartitionState} is eligible to be retained, so we'll set the state in the
* {@link PartitionTracker}.
Expand Down Expand Up @@ -210,26 +210,39 @@ private void setSegment(TopicType type, GUID guid, Segment segment) {
/**
* Clone the vtSegments and LCVP to the destination PartitionTracker. May be called concurrently.
*/
public void cloneVtProducerStates(PartitionTracker destProducerTracker) {
public void cloneVtProducerStates(PartitionTracker destProducerTracker, long maxAgeInMs) {
long earliestAllowableTimestamp = maxAgeInMs == DISABLED ? DISABLED : new Date().getTime() - maxAgeInMs;
for (Map.Entry<GUID, Segment> entry: vtSegments.entrySet()) {
destProducerTracker.setSegment(PartitionTracker.VERSION_TOPIC, entry.getKey(), new Segment(entry.getValue()));
if (entry.getValue().getLastRecordTimestamp() >= earliestAllowableTimestamp) {
destProducerTracker.setSegment(PartitionTracker.VERSION_TOPIC, entry.getKey(), new Segment(entry.getValue()));
} else {
vtSegments.remove(entry.getKey()); // The state is eligible to be cleared.
Copy link
Contributor

@lluwm lluwm Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing keys from a Map while iterating over it using standard for is a problem and will generally result in a ConcurrentModificationException. Can you check PartitionTracker.clearExpiredStateAndUpdateOffsetRecord as an example impl or even consider to call it from here to avoid duplicate code, if possible.

}
}
destProducerTracker.updateLatestConsumedVtPosition(latestConsumedVtPosition.get());
}

/**
* Clone the rtSegments to the destination PartitionTracker. Filter by brokerUrl. May be called concurrently.
*/
public void cloneRtProducerStates(PartitionTracker destProducerTracker, String brokerUrl) {
for (Map.Entry<String, VeniceConcurrentHashMap<GUID, Segment>> entry: rtSegments.entrySet()) {
if (!brokerUrl.isEmpty() && !brokerUrl.equals(entry.getKey())) {
continue; // filter by brokerUrl if specified
public void cloneRtProducerStates(PartitionTracker destProducerTracker, String brokerUrl, long maxAgeInMs) {
long earliestAllowableTimestamp = maxAgeInMs == DISABLED ? DISABLED : new Date().getTime() - maxAgeInMs;
for (Map.Entry<String, VeniceConcurrentHashMap<GUID, Segment>> broker2Segment: rtSegments.entrySet()) {
if (!brokerUrl.equals(broker2Segment.getKey())) {
continue; // filter by the specified brokerUrl
}
for (Map.Entry<GUID, Segment> rtEntry: entry.getValue().entrySet()) {
destProducerTracker.setSegment(
TopicType.of(TopicType.REALTIME_TOPIC_TYPE, entry.getKey()),
rtEntry.getKey(),
new Segment(rtEntry.getValue()));

final VeniceConcurrentHashMap<GUID, Segment> rtEntries = broker2Segment.getValue();
for (Map.Entry<GUID, Segment> rtEntry: rtEntries.entrySet()) {
if (rtEntry.getValue().getLastRecordTimestamp() >= earliestAllowableTimestamp) {
TopicType realTimeTopicType = TopicType.of(TopicType.REALTIME_TOPIC_TYPE, broker2Segment.getKey());
destProducerTracker.setSegment(realTimeTopicType, rtEntry.getKey(), new Segment(rtEntry.getValue()));
} else {
rtEntries.remove(rtEntry.getKey()); // The state is eligible to be cleared.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

}
}
if (broker2Segment.getValue().isEmpty()) {
rtSegments.remove(broker2Segment.getKey());
}
}
}
Expand Down Expand Up @@ -711,7 +724,7 @@ public void checkMissingMessage(
}

void clearExpiredStateAndUpdateOffsetRecord(TopicType type, OffsetRecord offsetRecord, long maxAgeInMs) {
long minimumRequiredRecordProducerTimestamp = offsetRecord.getMaxMessageTimeInMs() - maxAgeInMs;
long minimumRequiredRecordProducerTimestamp = offsetRecord.calculateLatestMessageTimeInMs() - maxAgeInMs;
int numberOfClearedGUIDs = 0;
Iterator<Map.Entry<GUID, Segment>> iterator = getSegments(type).entrySet().iterator();
Map.Entry<GUID, Segment> entry;
Expand Down Expand Up @@ -747,14 +760,15 @@ public void removeProducerState(TopicType type, GUID guid, OffsetRecord offsetRe
}

@VisibleForTesting
Map<String, Map<GUID, Segment>> getAllRtSegmentsForTesting() {
return rtSegments.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> Collections.unmodifiableMap(entry.getValue())));
VeniceConcurrentHashMap<String, VeniceConcurrentHashMap<GUID, Segment>> getRtSegmentsForTesting() {
return rtSegments;
// return rtSegments.entrySet()
// .stream()
// .collect(Collectors.toMap(Map.Entry::getKey, entry -> Collections.unmodifiableMap(entry.getValue())));
}

@VisibleForTesting
Map<GUID, Segment> getVtSegmentsForTesting() {
VeniceConcurrentHashMap<GUID, Segment> getVtSegmentsForTesting() {
return vtSegments;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,31 @@ public void testShouldSyncOffsetFromSnapshot() throws InterruptedException {
doReturn(ControlMessageType.START_OF_SEGMENT.getValue()).when(mockControlMessage).getControlMessageType();
assertFalse(
mockIngestionTask.shouldSyncOffsetFromSnapshot(nonSegmentControlMessage, mockPartitionConsumptionState));

// Mock the getSyncBytesInterval method to return a specific value
doReturn(1000L).when(mockIngestionTask).getSyncBytesInterval(any());

// Create a mock message that is neither a Global RT DIV nor a control message
final DefaultPubSubMessage regularMessage = getMockMessage(3).getMessage();
KafkaKey regularMockKey = regularMessage.getKey();
doReturn(false).when(regularMockKey).isGlobalRtDiv();
doReturn(false).when(regularMockKey).isControlMessage();

// Test case 1: When processedRecordSizeSinceLastSync is less than 2*syncBytesInterval
doReturn(1500L).when(mockPartitionConsumptionState).getProcessedRecordSizeSinceLastSync();
assertFalse(mockIngestionTask.shouldSyncOffsetFromSnapshot(regularMessage, mockPartitionConsumptionState));

// Test case 2: When processedRecordSizeSinceLastSync is equal to 2*syncBytesInterval
doReturn(2000L).when(mockPartitionConsumptionState).getProcessedRecordSizeSinceLastSync();
assertTrue(mockIngestionTask.shouldSyncOffsetFromSnapshot(regularMessage, mockPartitionConsumptionState));

// Test case 3: When processedRecordSizeSinceLastSync is greater than 2*syncBytesInterval
doReturn(2500L).when(mockPartitionConsumptionState).getProcessedRecordSizeSinceLastSync();
assertTrue(mockIngestionTask.shouldSyncOffsetFromSnapshot(regularMessage, mockPartitionConsumptionState));

// Test case 4: When syncBytesInterval is 0 (disabled)
doReturn(0L).when(mockIngestionTask).getSyncBytesInterval(any());
assertFalse(mockIngestionTask.shouldSyncOffsetFromSnapshot(regularMessage, mockPartitionConsumptionState));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,8 +378,8 @@ private static DefaultPubSubMessage buildRecord(
messageEnvelope.leaderMetadataFooter.upstreamPubSubPosition = PubSubSymbolicPosition.EARLIEST.toWireFormatBuffer();
messageEnvelope.payloadUnion = payload;

if (offsetRecord != null && offsetRecord.getMaxMessageTimeInMs() < brokerTimestamp) {
when(offsetRecord.getMaxMessageTimeInMs()).thenReturn(brokerTimestamp);
if (offsetRecord != null && offsetRecord.calculateLatestMessageTimeInMs() < brokerTimestamp) {
when(offsetRecord.calculateLatestMessageTimeInMs()).thenReturn(brokerTimestamp);
}

return new ImmutablePubSubMessage(
Expand Down
Loading
Loading