diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index cd29f64d5ee6..55615b1ddafa 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -644,10 +644,12 @@ private void insertInSequenceOrder(Deque deque, ProducerBatch bat private long batchReady(boolean exhausted, TopicPartition part, Node leader, long waitedTimeMs, boolean backingOff, int backoffAttempts, boolean full, long nextReadyCheckDelayMs, Set readyNodes) { + log.debug("------" + readyNodes); if (!readyNodes.contains(leader) && !isMuted(part)) { long timeToWaitMs = backingOff ? retryBackoff.backoff(backoffAttempts > 0 ? backoffAttempts - 1 : 0) : lingerMs; boolean expired = waitedTimeMs >= timeToWaitMs; boolean transactionCompleting = transactionManager != null && transactionManager.isCompleting(); + log.debug("------========" + full + expired + exhausted + closed + flushesInProgress + transactionCompleting); boolean sendable = full || expired || exhausted @@ -664,6 +666,7 @@ private long batchReady(boolean exhausted, TopicPartition part, Node leader, nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs); } } + log.debug("------" + readyNodes); return nextReadyCheckDelayMs; } @@ -745,6 +748,10 @@ private long partitionReady(MetadataSnapshot metadataSnapshot, long nowMs, Strin backoffAttempts = batch.attempts(); dequeSize = deque.size(); full = dequeSize > 1 || batch.isFull(); + // Q: why is full? + boolean k = dequeSize > 1; + log.debug("dequeSize > 1" + k); + log.debug("batch.isFull()" + batch.isFull()); } if (leader == null) { @@ -767,9 +774,10 @@ private long partitionReady(MetadataSnapshot metadataSnapshot, long nowMs, Strin --queueSizesIndex; } } - + log.debug("zzzzz" + readyNodes.toString()); nextReadyCheckDelayMs = batchReady(exhausted, part, leader, waitedTimeMs, backingOff, backoffAttempts, full, nextReadyCheckDelayMs, readyNodes); + log.debug("222zzzzz" + readyNodes.toString()); } } @@ -803,6 +811,7 @@ private long partitionReady(MetadataSnapshot metadataSnapshot, long nowMs, Strin */ public ReadyCheckResult ready(MetadataSnapshot metadataSnapshot, long nowMs) { Set readyNodes = new HashSet<>(); + log.debug("kkkkk " + readyNodes); long nextReadyCheckDelayMs = Long.MAX_VALUE; Set unknownLeaderTopics = new HashSet<>(); // Go topic by topic so that we can get queue sizes for partitions in a topic and calculate @@ -811,6 +820,7 @@ public ReadyCheckResult ready(MetadataSnapshot metadataSnapshot, long nowMs) { final String topic = topicInfoEntry.getKey(); nextReadyCheckDelayMs = partitionReady(metadataSnapshot, nowMs, topic, topicInfoEntry.getValue(), nextReadyCheckDelayMs, readyNodes, unknownLeaderTopics); } + log.debug("2222kkkkk " + readyNodes); return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics); } diff --git a/clients/src/test/resources/log4j.properties b/clients/src/test/resources/log4j.properties index 0992580eca1d..1e918e25c06a 100644 --- a/clients/src/test/resources/log4j.properties +++ b/clients/src/test/resources/log4j.properties @@ -19,5 +19,6 @@ log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n log4j.logger.org.apache.kafka=ERROR +log4j.logger.org.apache.kafka.clients.producer.internals=DEBUG # We are testing for a particular INFO log message in CommonNameLoggingTrustManagerFactoryWrapper log4j.logger.org.apache.kafka.common.security.ssl.CommonNameLoggingTrustManagerFactoryWrapper=INFO