Skip to content

Commit

Permalink
deubg info
Browse files Browse the repository at this point in the history
  • Loading branch information
TaiJuWu committed Nov 24, 2024
1 parent bcd46cf commit 647eefb
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -644,10 +644,12 @@ private void insertInSequenceOrder(Deque<ProducerBatch> deque, ProducerBatch bat
private long batchReady(boolean exhausted, TopicPartition part, Node leader,
long waitedTimeMs, boolean backingOff, int backoffAttempts,
boolean full, long nextReadyCheckDelayMs, Set<Node> readyNodes) {
log.debug("------" + readyNodes);
if (!readyNodes.contains(leader) && !isMuted(part)) {
long timeToWaitMs = backingOff ? retryBackoff.backoff(backoffAttempts > 0 ? backoffAttempts - 1 : 0) : lingerMs;
boolean expired = waitedTimeMs >= timeToWaitMs;
boolean transactionCompleting = transactionManager != null && transactionManager.isCompleting();
log.debug("------========" + full + expired + exhausted + closed + flushesInProgress + transactionCompleting);
boolean sendable = full
|| expired
|| exhausted
Expand All @@ -664,6 +666,7 @@ private long batchReady(boolean exhausted, TopicPartition part, Node leader,
nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
}
}
log.debug("------" + readyNodes);
return nextReadyCheckDelayMs;
}

Expand Down Expand Up @@ -745,6 +748,10 @@ private long partitionReady(MetadataSnapshot metadataSnapshot, long nowMs, Strin
backoffAttempts = batch.attempts();
dequeSize = deque.size();
full = dequeSize > 1 || batch.isFull();
// Q: why is full?
boolean k = dequeSize > 1;
log.debug("dequeSize > 1" + k);
log.debug("batch.isFull()" + batch.isFull());
}

if (leader == null) {
Expand All @@ -767,9 +774,10 @@ private long partitionReady(MetadataSnapshot metadataSnapshot, long nowMs, Strin
--queueSizesIndex;
}
}

log.debug("zzzzz" + readyNodes.toString());
nextReadyCheckDelayMs = batchReady(exhausted, part, leader, waitedTimeMs, backingOff,
backoffAttempts, full, nextReadyCheckDelayMs, readyNodes);
log.debug("222zzzzz" + readyNodes.toString());
}
}

Expand Down Expand Up @@ -803,6 +811,7 @@ private long partitionReady(MetadataSnapshot metadataSnapshot, long nowMs, Strin
*/
public ReadyCheckResult ready(MetadataSnapshot metadataSnapshot, long nowMs) {
Set<Node> readyNodes = new HashSet<>();
log.debug("kkkkk " + readyNodes);
long nextReadyCheckDelayMs = Long.MAX_VALUE;
Set<String> unknownLeaderTopics = new HashSet<>();
// Go topic by topic so that we can get queue sizes for partitions in a topic and calculate
Expand All @@ -811,6 +820,7 @@ public ReadyCheckResult ready(MetadataSnapshot metadataSnapshot, long nowMs) {
final String topic = topicInfoEntry.getKey();
nextReadyCheckDelayMs = partitionReady(metadataSnapshot, nowMs, topic, topicInfoEntry.getValue(), nextReadyCheckDelayMs, readyNodes, unknownLeaderTopics);
}
log.debug("2222kkkkk " + readyNodes);
return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}

Expand Down
1 change: 1 addition & 0 deletions clients/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n

log4j.logger.org.apache.kafka=ERROR
log4j.logger.org.apache.kafka.clients.producer.internals=DEBUG
# We are testing for a particular INFO log message in CommonNameLoggingTrustManagerFactoryWrapper
log4j.logger.org.apache.kafka.common.security.ssl.CommonNameLoggingTrustManagerFactoryWrapper=INFO

0 comments on commit 647eefb

Please sign in to comment.