Skip to content

Commit

Permalink
debug info
Browse files Browse the repository at this point in the history
  • Loading branch information
TaiJuWu committed Nov 24, 2024
1 parent bcd46cf commit d7753c3
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,7 @@ private void insertInSequenceOrder(Deque<ProducerBatch> deque, ProducerBatch bat
private long batchReady(boolean exhausted, TopicPartition part, Node leader,
long waitedTimeMs, boolean backingOff, int backoffAttempts,
boolean full, long nextReadyCheckDelayMs, Set<Node> readyNodes) {
log.debug("------" + readyNodes);
if (!readyNodes.contains(leader) && !isMuted(part)) {
long timeToWaitMs = backingOff ? retryBackoff.backoff(backoffAttempts > 0 ? backoffAttempts - 1 : 0) : lingerMs;
boolean expired = waitedTimeMs >= timeToWaitMs;
Expand All @@ -664,6 +665,7 @@ private long batchReady(boolean exhausted, TopicPartition part, Node leader,
nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
}
}
log.debug("------" + readyNodes);
return nextReadyCheckDelayMs;
}

Expand Down Expand Up @@ -767,9 +769,10 @@ private long partitionReady(MetadataSnapshot metadataSnapshot, long nowMs, Strin
--queueSizesIndex;
}
}

log.debug("zzzzz" + readyNodes.toString());
nextReadyCheckDelayMs = batchReady(exhausted, part, leader, waitedTimeMs, backingOff,
backoffAttempts, full, nextReadyCheckDelayMs, readyNodes);
log.debug("222zzzzz" + readyNodes.toString());
}
}

Expand Down Expand Up @@ -803,6 +806,7 @@ private long partitionReady(MetadataSnapshot metadataSnapshot, long nowMs, Strin
*/
public ReadyCheckResult ready(MetadataSnapshot metadataSnapshot, long nowMs) {
Set<Node> readyNodes = new HashSet<>();
log.debug("kkkkk " + readyNodes);
long nextReadyCheckDelayMs = Long.MAX_VALUE;
Set<String> unknownLeaderTopics = new HashSet<>();
// Go topic by topic so that we can get queue sizes for partitions in a topic and calculate
Expand All @@ -811,6 +815,7 @@ public ReadyCheckResult ready(MetadataSnapshot metadataSnapshot, long nowMs) {
final String topic = topicInfoEntry.getKey();
nextReadyCheckDelayMs = partitionReady(metadataSnapshot, nowMs, topic, topicInfoEntry.getValue(), nextReadyCheckDelayMs, readyNodes, unknownLeaderTopics);
}
log.debug("2222kkkkk " + readyNodes);
return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}

Expand Down
1 change: 1 addition & 0 deletions clients/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n

log4j.logger.org.apache.kafka=ERROR
log4j.logger.org.apache.kafka.clients.producer.internals=DEBUG
# We are testing for a particular INFO log message in CommonNameLoggingTrustManagerFactoryWrapper
log4j.logger.org.apache.kafka.common.security.ssl.CommonNameLoggingTrustManagerFactoryWrapper=INFO

0 comments on commit d7753c3

Please sign in to comment.