From cc7555d9b7172262aac89a14e22c13b11a6c39a6 Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Mon, 1 Sep 2025 18:37:05 +0000 Subject: [PATCH] Only retrieve the shared caches once during setup in ReadFromKafkaDoFn --- .../beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 67 +++++++++++++++---- 1 file changed, 55 insertions(+), 12 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index 70015847e19d..f9102b193265 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -78,7 +78,10 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Deserializer; +import org.checkerframework.checker.nullness.qual.EnsuresNonNull; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.Nullable; +import org.checkerframework.checker.nullness.qual.RequiresNonNull; import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -316,6 +319,16 @@ public Consumer load( private final SerializableSupplier>> pollConsumerCacheSupplier; + private transient @MonotonicNonNull LoadingCache + avgRecordSizeCache; + + private transient @MonotonicNonNull LoadingCache< + KafkaSourceDescriptor, KafkaLatestOffsetEstimator> + latestOffsetEstimatorCache; + + private transient @MonotonicNonNull LoadingCache> + pollConsumerCache; + // Valid between bundle start and bundle finish. private transient @Nullable Deserializer keyDeserializerInstance = null; private transient @Nullable Deserializer valueDeserializerInstance = null; @@ -433,9 +446,12 @@ private void refresh() { } @GetInitialRestriction + @RequiresNonNull({"pollConsumerCache"}) public OffsetRange initialRestriction(@Element KafkaSourceDescriptor kafkaSourceDescriptor) { - final Consumer consumer = - pollConsumerCacheSupplier.get().getUnchecked(kafkaSourceDescriptor); + final LoadingCache> pollConsumerCache = + this.pollConsumerCache; + + final Consumer consumer = pollConsumerCache.getUnchecked(kafkaSourceDescriptor); final long startOffset; final long stopOffset; @@ -513,12 +529,16 @@ public WatermarkEstimator newWatermarkEstimator( } @GetSize + @RequiresNonNull({"avgRecordSizeCache", "latestOffsetEstimatorCache"}) public double getSize( @Element KafkaSourceDescriptor kafkaSourceDescriptor, @Restriction OffsetRange offsetRange) { + final LoadingCache avgRecordSizeCache = + this.avgRecordSizeCache; + // If present, estimates the record size to offset gap ratio. Compacted topics may hold less // records than the estimated offset range due to record deletion within a partition. final @Nullable MovingAvg avgRecordSize = - avgRecordSizeCacheSupplier.get().getIfPresent(kafkaSourceDescriptor); + avgRecordSizeCache.getIfPresent(kafkaSourceDescriptor); // The tracker estimates the offset range by subtracting the last claimed position from the // currently observed end offset for the partition belonging to this split. final double estimatedOffsetRange = @@ -533,8 +553,12 @@ public double getSize( } @NewTracker + @RequiresNonNull({"latestOffsetEstimatorCache"}) public OffsetRangeTracker restrictionTracker( @Element KafkaSourceDescriptor kafkaSourceDescriptor, @Restriction OffsetRange restriction) { + final LoadingCache + latestOffsetEstimatorCache = this.latestOffsetEstimatorCache; + if (restriction.getTo() < Long.MAX_VALUE) { return new OffsetRangeTracker(restriction); } @@ -543,22 +567,28 @@ public OffsetRangeTracker restrictionTracker( // so we want to minimize the amount of connections that we start and track with Kafka. Another // point is that it has a memoized backlog, and this should make that more reusable estimations. return new GrowableOffsetRangeTracker( - restriction.getFrom(), - latestOffsetEstimatorCacheSupplier.get().getUnchecked(kafkaSourceDescriptor)); + restriction.getFrom(), latestOffsetEstimatorCache.getUnchecked(kafkaSourceDescriptor)); } @ProcessElement + @RequiresNonNull({"avgRecordSizeCache", "latestOffsetEstimatorCache", "pollConsumerCache"}) public ProcessContinuation processElement( @Element KafkaSourceDescriptor kafkaSourceDescriptor, RestrictionTracker tracker, WatermarkEstimator watermarkEstimator, MultiOutputReceiver receiver) throws Exception { - final MovingAvg avgRecordSize = avgRecordSizeCacheSupplier.get().get(kafkaSourceDescriptor); + final LoadingCache avgRecordSizeCache = + this.avgRecordSizeCache; + final LoadingCache + latestOffsetEstimatorCache = this.latestOffsetEstimatorCache; + final LoadingCache> pollConsumerCache = + this.pollConsumerCache; + + final MovingAvg avgRecordSize = avgRecordSizeCache.get(kafkaSourceDescriptor); final KafkaLatestOffsetEstimator latestOffsetEstimator = - latestOffsetEstimatorCacheSupplier.get().get(kafkaSourceDescriptor); - final Consumer consumer = - pollConsumerCacheSupplier.get().get(kafkaSourceDescriptor); + latestOffsetEstimatorCache.get(kafkaSourceDescriptor); + final Consumer consumer = pollConsumerCache.get(kafkaSourceDescriptor); final Deserializer keyDeserializerInstance = Preconditions.checkStateNotNull(this.keyDeserializerInstance); final Deserializer valueDeserializerInstance = @@ -734,7 +764,12 @@ public Coder restrictionCoder() { } @Setup + @EnsuresNonNull({"avgRecordSizeCache", "latestOffsetEstimatorCache", "pollConsumerCache"}) public void setup() throws Exception { + avgRecordSizeCache = avgRecordSizeCacheSupplier.get(); + latestOffsetEstimatorCache = latestOffsetEstimatorCacheSupplier.get(); + pollConsumerCache = pollConsumerCacheSupplier.get(); + keyDeserializerInstance = keyDeserializerProvider.getDeserializer(consumerConfig, true); valueDeserializerInstance = valueDeserializerProvider.getDeserializer(consumerConfig, false); if (checkStopReadingFn != null) { @@ -743,7 +778,15 @@ public void setup() throws Exception { } @Teardown + @RequiresNonNull({"avgRecordSizeCache", "latestOffsetEstimatorCache", "pollConsumerCache"}) public void teardown() throws Exception { + final LoadingCache avgRecordSizeCache = + this.avgRecordSizeCache; + final LoadingCache + latestOffsetEstimatorCache = this.latestOffsetEstimatorCache; + final LoadingCache> pollConsumerCache = + this.pollConsumerCache; + try { if (valueDeserializerInstance != null) { Closeables.close(valueDeserializerInstance, true); @@ -761,9 +804,9 @@ public void teardown() throws Exception { } // Allow the cache to perform clean up tasks when this instance is about to be deleted. - avgRecordSizeCacheSupplier.get().cleanUp(); - latestOffsetEstimatorCacheSupplier.get().cleanUp(); - pollConsumerCacheSupplier.get().cleanUp(); + avgRecordSizeCache.cleanUp(); + latestOffsetEstimatorCache.cleanUp(); + pollConsumerCache.cleanUp(); } private static Instant ensureTimestampWithinBounds(Instant timestamp) {