From d7753c3f3c727fae2a5f31b5c3519826c8f42608 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Sun, 24 Nov 2024 02:53:55 +0000 Subject: [PATCH] debug info --- .../clients/producer/internals/RecordAccumulator.java | 7 ++++++- clients/src/test/resources/log4j.properties | 1 + 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index cd29f64d5ee60..8bee185002eff 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -644,6 +644,7 @@ private void insertInSequenceOrder(Deque deque, ProducerBatch bat private long batchReady(boolean exhausted, TopicPartition part, Node leader, long waitedTimeMs, boolean backingOff, int backoffAttempts, boolean full, long nextReadyCheckDelayMs, Set readyNodes) { + log.debug("------" + readyNodes); if (!readyNodes.contains(leader) && !isMuted(part)) { long timeToWaitMs = backingOff ? retryBackoff.backoff(backoffAttempts > 0 ? backoffAttempts - 1 : 0) : lingerMs; boolean expired = waitedTimeMs >= timeToWaitMs; @@ -664,6 +665,7 @@ private long batchReady(boolean exhausted, TopicPartition part, Node leader, nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs); } } + log.debug("------" + readyNodes); return nextReadyCheckDelayMs; } @@ -767,9 +769,10 @@ private long partitionReady(MetadataSnapshot metadataSnapshot, long nowMs, Strin --queueSizesIndex; } } - + log.debug("zzzzz" + readyNodes.toString()); nextReadyCheckDelayMs = batchReady(exhausted, part, leader, waitedTimeMs, backingOff, backoffAttempts, full, nextReadyCheckDelayMs, readyNodes); + log.debug("222zzzzz" + readyNodes.toString()); } } @@ -803,6 +806,7 @@ private long partitionReady(MetadataSnapshot metadataSnapshot, long nowMs, Strin */ public ReadyCheckResult ready(MetadataSnapshot metadataSnapshot, long nowMs) { Set readyNodes = new HashSet<>(); + log.debug("kkkkk " + readyNodes); long nextReadyCheckDelayMs = Long.MAX_VALUE; Set unknownLeaderTopics = new HashSet<>(); // Go topic by topic so that we can get queue sizes for partitions in a topic and calculate @@ -811,6 +815,7 @@ public ReadyCheckResult ready(MetadataSnapshot metadataSnapshot, long nowMs) { final String topic = topicInfoEntry.getKey(); nextReadyCheckDelayMs = partitionReady(metadataSnapshot, nowMs, topic, topicInfoEntry.getValue(), nextReadyCheckDelayMs, readyNodes, unknownLeaderTopics); } + log.debug("2222kkkkk " + readyNodes); return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics); } diff --git a/clients/src/test/resources/log4j.properties b/clients/src/test/resources/log4j.properties index 0992580eca1d8..1e918e25c06a4 100644 --- a/clients/src/test/resources/log4j.properties +++ b/clients/src/test/resources/log4j.properties @@ -19,5 +19,6 @@ log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n log4j.logger.org.apache.kafka=ERROR +log4j.logger.org.apache.kafka.clients.producer.internals=DEBUG # We are testing for a particular INFO log message in CommonNameLoggingTrustManagerFactoryWrapper log4j.logger.org.apache.kafka.common.security.ssl.CommonNameLoggingTrustManagerFactoryWrapper=INFO