public static final KafkaConsumer<String, String> KAFKA_CONSUMER;
public static final ExecutorService CONCURRENT_CONUSMER;
public static void main(String[] args) {
KAFKA_CONSUMER.subscribe(Lists.newArrayList("test_topic"));
while (true) {
ConsumerRecords<String, String> poll = KAFKA_CONSUMER.poll(Duration.ofMillis(1000));
Iterator<ConsumerRecord<String, String>> iterator = poll.iterator();
if (iterator.hasNext()) {
log.info("message:{}", iterator.next());
}
try {
KAFKA_CONSUMER.commitSync();
} catch (Exception e) {
log.error("提交offset失败");
}
}
}
static {
KAFKA_CONSUMER = new KafkaConsumer<>(createConsumerProperties());
CONCURRENT_CONUSMER = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() / 2, Runtime.getRuntime().availableProcessors(),
3, TimeUnit.MINUTES,
new ArrayBlockingQueue<>(1000),
new ThreadFactoryBuilder()
.setNameFormat("original-consumer-%d")
.build(), new ThreadPoolExecutor.CallerRunsPolicy());
}
/**
* 创建消费者配置
*
* @return
*/
private static Properties createConsumerProperties() {
Properties properties = new Properties();
//bootstrap-servers
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094");
//group id
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "learning-kafka-consumer-group");
//key反序列化方式
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//value反序列化方式
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//开启自动commit offet
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
//每1s提交一次
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
//客户端id
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "origin-consumer");
return properties;
}
- Kafka Consumer基于Poll方法去从服务端拉取数据 ,每次消费的offset也是由消费者控制,以前offset是由Zookeeper控制的,新版本将offset 上传至_consumer_offset控制,也就是交给Kafka服务端控制 。Kafka也支持开发人员将offset记录在其他持久化平台,例如为了保证全局消息顺序一致而将offset防止一个全局的kv结构数据库。
- At most once:消息可能会丢,但不会重复传递
- At least once:消息决不会丢,但可能会重复
- Exactly once:每条消息只会被传递一次
- 生产者保证不会产生重复消息,消费端保证不能重复拉取相同的消息。
- 每个分区只有一个生产者写入消息 ,当出现异常或超时的情况时,生产者就要查询此分区的最后一个消息,用来决定后续操作是消息重传还是继续发送。
- 为每个消息添加一个全局唯一主键,生产者不做其他特殊处理,按照之前分析方式进行重传,由消费者对消息进行去重,实现"Exactly Once"语义。
- commit Sync同步提交offset
- commitAsync异步提交Offset
如果提交方式控制不当可能会导致,重复消费(at least once)以及丢失i消息(at most once)的情况。
消费者将关闭自动提交offset的功能且不再手动提交offset,这样就不使用Offsets Topic这个内部Topic记录其offset,而是由消费者自己保存offset。这里利用事务的原子性来实现“Exactly once”语义,我们将offset和消息处理结果放在一个事务中,事务执行成功则认为此消息被消费,否则事务回滚需要重新消费。当出现消费者宕机重启或Rebalance操作时,消费者可以从关系型数据库中找到对应的offset,然后调用KafkaConsumer.seek()方法手动设置消费位置,从此offset处开始继续消费。
/**
* @fileName: HandleRebalance.java
* @description: 监听再均衡
* @author: by echo huang
* @date: 2020-04-26 23:39
*/
@Slf4j
public class HandleRebalance implements ConsumerRebalanceListener {
private KafkaConsumer<String, String> consumer;
private Map<TopicPartition, OffsetAndMetadata> currentOffset;
public HandleRebalance(KafkaConsumer<String, String> consumer, Map<TopicPartition, OffsetAndMetadata> currentOffset) {
this.consumer = consumer;
this.currentOffset = currentOffset;
}
/**
* 再均衡开始之前和消费者停止读取消息之后被调用。如果这里提交offset,下一个接管分区的consumer就知道从哪里开始消费了
*
* @param partitions
*/
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
log.info("再均衡之前,consumer停止消费后提交offset给服务器");
consumer.commitSync(currentOffset);
}
/**
* 在重新分配分区之后和消费者开始读取消息之前被调用
*
* @param partitions
*/
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
log.info(" 在重新分配分区之后和消费者开始读取消息之前被调用");
}
}
- 每个Consumer Group在Zookeeper下都维护了一个"/consumers/[group+id]/ids", 在此路径下使用临时节点记录属于此Consumer Group的消费者的Id ,由 Consumer启动时创建 。还有两个和ids同级别的节点 owners节点 ,记录了分区与消费者的对应关系;offsets节点,记录了此Consumer Group在某个分区上的消费位置 。
- 每个Broker、Topic以及分区在Zookeeper中也都对应一个路径,如下所示。
- /brokers/ids/broker_id:记录了host、port以及分配在此Broker上的Topic的分区列表。
- /brokers/topics/[topic_name]:记录了每个Partition的Leader、ISR等信息。
- /brokers/topics/[topic_name]/partitions/[partition_num]/state:记录了当前Leader、选举epoch等信息。
-
当路径下的子节点发生变化时,Zookeeper基于该子节点注册的Watcher就可以通知到客户端。
-
羊群效应(Herd Effect) :先解释一下什么是“羊群效应”,一个被Watch的ZooKeeper节点变化,导致大量的Watcher通知需要被发送给客户端,这将导致在通知期间其他操作延迟。一般出现这种情况的主要原因就是没有找到客户端真正的关注点,也算是滥用Watcher的一种场景。继续前面的分析,任何Broker或Consumer加入或退出,都会向其余所有的Consumer发送Watcher通知触发Rebalance,就出现了“羊群效应”
-
脑裂(Split Brain) :每个Consumer都是通过ZooKeeper中保存的这些元数据判断Consumer Group状态、Broker的状态以及Rebalance结果的,由于ZooKeeper只保证“最终一致性”,不保证“Simultaneously Consistent Cross-Client Views”,不同Consumer在同一时刻可能连接到ZooKeeper集群中不同的服务器,看到的元数据就可能不一样,这就会造成不正确的Rebalance尝试。
-
- 将全部的Consumer Group分成多个子集 ,每个 Consumer Group子集在服务端对应一个GroupCoordinator对其进行管理 , GroupCoordinator是KafkaServer中用于管理Consumer Group的组件 。消费者不再依赖ZooKeeper,而只有GroupCoordinator在ZooKeeper上添加Watcher。 消费者在加入或退出Consumer Group时会修改ZooKeeper中保存的元数据 ,这点与上文描述的方案一类似,此时会触发GroupCoordinator设置的Watcher,通知GroupCoordinator开始Rebalance操作 。
- 当前消费者准备加入某Consumer Group或是GroupCoordinator发生故障转移时,消费者并不知道GroupCoordinator的网络位置,消费者会向Kafka集群中的任一Broker发送ConsumerMetadataRequest,此请求中包含了其 Consumer Group的GroupId ,收到请求的Broker会返回ConsumerMetadataResponse作为响应,其中包含了管理此 Consumer Group的GroupCoordinator 的相关信息。
- 消费者根据ConsumerMetadataResponse中的GroupCoordinator信息, 连接到GroupCoordinator并周期性地发送HeartbeatRequest 。发送 HeartbeatRequest的主要作用是为了告诉GroupCoordinator此消费者正常在线 ,GroupCoordinator会认为长时间未发送HeartbeatRequest的消费者已经下线, 触发新一轮的Rebalance操作 。
- 如果 HeartbeatResponse中带有IllegalGeneration异常,说明GroupCoordinator发起了Rebalance操作 ,此时消费者 发送JoinGroupRequest给GroupCoordinator ,JoinGroupRequest的主要目的是为了通知GroupCoordinator,当前消费者要加入指定的Consumer Group 。之后,GroupCoordinator 会根据收到的JoinGroupRequest和ZooKeeper中的元数据完成对此Consumer Group的分区分配 。
- 消费者成功成为Consumer Group的成员后,会周期性发送HeartbeatRequest。如果 HeartbeatResponse包含IllegalGeneration异常,则执行上一步骤。如果找不到对应的GroupCoordinator( HeartbeatResponse包含NotCoordinatorForGroup异常 ),则周期性地执行步骤1,直至成功。
- 分区分配的操作是在服务端的 GroupCoordinator中完成的 ,这就要求服务端实现Partition的分配策略 。当要使用新的Partition分配策略时,就必须修改服务端的代码或配置,之后重启服务。
- 不同的Rebalance策略有不同的验证需求。当需要自定义分区策略和验证需求时比较繁琐。
- 在Kafka0.9版本后,将分区分配的工作放到了消费者这一段进行处理,而Consumer Group管理工作还是由GroupCoordinator处理。
- JoinGroupRequest的处理过程拆分成了两个阶段,分别是 Join Group阶段和Synchronizing Group State阶段 。
- 当消费者查找到管理当前Consumer Group的GroupCoordinator后,就会进入Join Group阶段 ,Consumer首先向GroupCoordinator发送 JoinGroupRequest 请求,其中包含消费者的相关信息; 服务端的GroupCoordinator收到JoinGroupRequest后会暂存消息,收集到全部消费者之后,根据JoinGroupRequest中的信息来确定Consumer Group中可用的消费者,从中选取一个消费者成为Group Leader,还会选取使用的分区分配策略,最后将这些信息封装成JoinGroupResponse返回给消费者 。
- 只有 Group Leader可以收到JoinGroupResponse中封装的所有消费者的信息 ,当消费者确认自己是Group Leader后,会根据消费者的信息以及选定的分区分配策略进行分区分配。
- 在 Synchronizing Group State阶段 ,每个消费者会发送SyncGroupRequest到GroupCoordinator ,但是只有Group Leader的SyncGroupRequest请求包含了分区的分配结果,GroupCoordinator根据Group Leader的分区分配结果,形成SyncGroupResponse返回给所有Consumer 。消费者收到SyncGroupResponse后进行解析, 即可获取分配给自身的分区。 具体协议的细节在源码分析过程介绍。以上是新版本协议的核心变动,心跳检测、故障转移等方面并未改动。
/**
* 分配
* @see KafkaConsumer#assignment()
*/
public Set<TopicPartition> assignment();
/**
* 订阅
* @see KafkaConsumer#subscription()
*/
public Set<String> subscription();
/**
* 订阅topic
* @see KafkaConsumer#subscribe(Collection)
*/
public void subscribe(Collection<String> topics);
/**
* 订阅topic并且注册再均衡监听器
* @see KafkaConsumer#subscribe(Collection, ConsumerRebalanceListener)
*/
public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback);
/**
* 手动订阅指定Topic,并且指定消费的分区。
* @see KafkaConsumer#assign(Collection)
*/
public void assign(Collection<TopicPartition> partitions);
/**
* @see KafkaConsumer#subscribe(Pattern, ConsumerRebalanceListener)
*/
public void subscribe(Pattern pattern, ConsumerRebalanceListener callback);
/**
* @see KafkaConsumer#unsubscribe()
*/
public void unsubscribe();
/**
* 拉取数据
* @see KafkaConsumer#poll(long)
*/
public ConsumerRecords<K, V> poll(long timeout);
/**
* 同步提交offset
* @see KafkaConsumer#commitSync()
*/
public void commitSync();
/**
* @see KafkaConsumer#commitSync(Map)
*/
public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);
/**
* @see KafkaConsumer#commitAsync()
*/
public void commitAsync();
/**
* @see KafkaConsumer#commitAsync(OffsetCommitCallback)
*/
public void commitAsync(OffsetCommitCallback callback);
/**
* @see KafkaConsumer#commitAsync(Map, OffsetCommitCallback)
*/
public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback);
/**
* 指定消费者的指定位置开始消费
* @see KafkaConsumer#seek(TopicPartition, long)
*/
public void seek(TopicPartition partition, long offset);
/**
* 指定分区从头开始消费
* @see KafkaConsumer#seekToBeginning(Collection)
*/
public void seekToBeginning(Collection<TopicPartition> partitions);
/**
* 指定分区从尾部开始消费
* @see KafkaConsumer#seekToEnd(Collection)
*/
public void seekToEnd(Collection<TopicPartition> partitions);
/**
* 得到指定分区的offset
* @see KafkaConsumer#position(TopicPartition)
*/
public long position(TopicPartition partition);
/**
* @see KafkaConsumer#committed(TopicPartition)
*/
public OffsetAndMetadata committed(TopicPartition partition);
/**
* @see KafkaConsumer#metrics()
*/
public Map<MetricName, ? extends Metric> metrics();
/**
* 得到指定topic的分区信息
* @see KafkaConsumer#partitionsFor(String)
*/
public List<PartitionInfo> partitionsFor(String topic);
/**
* @see KafkaConsumer#listTopics()
*/
public Map<String, List<PartitionInfo>> listTopics();
/**
* 暂停后,poll会返回空
* @see KafkaConsumer#paused()
*/
public Set<TopicPartition> paused();
/**
* 暂停后,poll会返回空
* @see KafkaConsumer#pause(Collection)
*/
public void pause(Collection<TopicPartition> partitions);
/**
* 恢复
* @see KafkaConsumer#resume(Collection)
*/
public void resume(Collection<TopicPartition> partitions);
/**
* @see KafkaConsumer#close()
*/
public void close();
/**
* @see KafkaConsumer#wakeup()
*/
public void wakeup();
- 类似于NetworkClient依赖于KSelector、InFlightRequests、Metadata等组件,负责管理客户端与Kafka集群中各个Node节点之间的连接,通过KSelector实现发送请求,通过一系列的handle*()方法处理请求响应、超时请求以及断线重连。
- ConsumerNetworkClient是基于NetworkClient之上的封装。
- poll有三个重载方法,最终都会调用poll(long timeout, long now, boolean executeDelayedTasks)
- timeout:执行poll方法最长阻塞时间
- now:当前时间戳;
- executeDelayedTasks:是否执行delayedTasks队列中的定时任务。
private void poll(long timeout, long now, boolean executeDelayedTasks) {
// send all the requests we can send now 在now发送全部请求,主要回去循环处理unsent中缓存的请求
trySend(now);
// ensure we don't poll any longer than the deadline for
// the next scheduled task
//计算超时时间,此超时时间由timeout与delayedTasks队列中最近要执行的定时任务的时间共同决定
timeout = Math.min(timeout, delayedTasks.nextTimeout(now));
//调用NetworkClient的poll方法,超时时间传递计算的出来的超时时间
clientPoll(timeout, now);
//拿到发送完请求的后的当前时间戳
now = time.milliseconds();
// handle any disconnects by failing the active requests. note that disconnects must
// be checked immediately following poll since any subsequent call to client.ready()
// will reset the disconnect status
//检查是否断开连接
checkDisconnects(now);
// execute scheduled tasks 如果executeDelayedTasks为true指定定时任务
if (executeDelayedTasks)
delayedTasks.poll(now);
// try again to send requests since buffer space may have been
// cleared or a connect finished in the poll
trySend(now);
// fail requests that couldn't be sent if they have expired
//失败的请求如果它们已经过期不能再发送
failExpiredRequests(now);
}
- 首先执行trySend方法,会尝试将unsent中缓存的请求全部发送
private boolean trySend(long now) {
// send any requests that can be sent now
boolean requestsSent = false;
//遍历缓存中的全部请求
for (Map.Entry<Node, List<ClientRequest>> requestEntry : unsent.entrySet()) {
//拿到对应的node节点
Node node = requestEntry.getKey();
Iterator<ClientRequest> iterator = requestEntry.getValue().iterator();
//遍历发送ClientRequest请求
while (iterator.hasNext()) {
ClientRequest request = iterator.next();
//如果node就绪
if (client.ready(node, now)) {
//发送请求,将客户端请求放入InFlightRequests队列等待响应,也放入KafkaChannel的send字段中等待发送
client.send(request, now);
//从unsent缓存中删除
iterator.remove();
//请求发送设置为true
requestsSent = true;
}
}
}
return requestsSent;
}
- 计算超时时间,主要是和最近要执行的定时任务的时间取最小值
timeout = Math.min(timeout, delayedTasks.nextTimeout(now));
//下一次执行任务的超时时间
public long nextTimeout(long now) {
//如果为null,表示最长
if (tasks.isEmpty())
return Long.MAX_VALUE;
else
//拿到最近要执行的任务的超时时间减去目前的时间与0取最大
return Math.max(tasks.peek().timeout - now, 0);
}
- 调用NetworkClient的poll方法,将KafkaChannel.send字段指定的消息发送出去,这里主要是trySend的数据。并且还会更新Metadata使用的一些列方法。
//调用NetworkClient的poll方法,超时时间传递计算的出来的超时时间
clientPoll(timeout, now);
private void clientPoll(long timeout, long now) {
client.poll(timeout, now);
//可能触发中断
maybeTriggerWakeup();
}
private void maybeTriggerWakeup() {
//如果不可中断的方法为0并且执行不可中断
if (wakeupDisabledCount == 0 && wakeup.get()) {
///设置为false并且抛出异常
wakeup.set(false);
throw new WakeupException();
}
}
networksclient.poll方法
handle*请求
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleConnections();
handleTimedOutRequests(responses, updatedNow);
- 检查连接状态
private void checkDisconnects(long now) {
// any disconnects affecting requests that have already been transmitted will be handled
// by NetworkClient, so we just need to check whether connections for any of the unsent
// requests have been disconnected; if they have, then we complete the corresponding future
// and set the disconnect flag in the ClientResponse
//判断usent缓存中的每个node节点的连接状态
Iterator<Map.Entry<Node, List<ClientRequest>>> iterator = unsent.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Node, List<ClientRequest>> requestEntry = iterator.next();
Node node = requestEntry.getKey();
//如果连接失败移除该数据,并且将对应的请求通过请求完成处理器传递给客户端
if (client.connectionFailed(node)) {
// Remove entry before invoking request callback to avoid callbacks handling
// coordinator failures traversing the unsent list again.
iterator.remove();
for (ClientRequest request : requestEntry.getValue()) {
RequestFutureCompletionHandler handler =
(RequestFutureCompletionHandler) request.callback();
handler.onComplete(new ClientResponse(request, now, true, null));
}
}
}
}
- 根据executeDelayedTasks决定是否处理delayTasks
// execute scheduled tasks 如果executeDelayedTasks为true指定定时任务
if (executeDelayedTasks)
delayedTasks.poll(now);
## delayedTask包括AutoCommitOffsetTask和HeartBeartTask
-
再次调用trySend方法,因为最开始调用了trySend,并且也调用了NetworkClient将KafkaChannel里的send数据发送出去了或者已经建立了连接,所以这里继续处理剩余请求
-
处理失败的请求或者超时的失败请求
private void failExpiredRequests(long now) {
// clear all expired unsent requests and fail their corresponding futures
//遍历unsent缓存
Iterator<Map.Entry<Node, List<ClientRequest>>> iterator = unsent.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Node, List<ClientRequest>> requestEntry = iterator.next();
Iterator<ClientRequest> requestIterator = requestEntry.getValue().iterator();
while (requestIterator.hasNext()) {
ClientRequest request = requestIterator.next();
//如果请求已经超时 移除并且抛出移除
if (request.createdTimeMs() < now - unsentExpiryMs) {
//将异常放入请求完成处理器中并且移除请求
RequestFutureCompletionHandler handler =
(RequestFutureCompletionHandler) request.callback();
handler.raise(new TimeoutException("Failed to send request after " + unsentExpiryMs + " ms."));
requestIterator.remove();
} else
break;
}
//如果请求集合为空,那么将其从unsent移除
if (requestEntry.getValue().isEmpty())
iterator.remove();
}
}
- 将待发送的请求封装成ClientRequest,然后保存到unsent集合中等待发送。
/**
* 调用IO请求并且立即返回,这将不能触发中断,也不会执行任何延迟任务
* Poll for network IO and return immediately. This will not trigger wakeups,
* nor will it execute any delayed tasks.
*/
public void pollNoWakeup() {
//关闭中断处理请求,添加不可中断方法
disableWakeups();
try {
//立即处理数据,并且不允许阻塞
poll(0, time.milliseconds(), false);
} finally {
enableWakeups();
}
}
public void enableWakeups() {
if (wakeupDisabledCount <= 0)
throw new IllegalStateException("Cannot enable wakeups since they were never disabled");
wakeupDisabledCount--;
// re-wakeup the client if the flag was set since previous wake-up call
// could be cleared by poll(0) while wakeups were disabled
//中断方法,唤醒当前阻塞IO
if (wakeupDisabledCount == 0 && wakeup.get())
this.client.wakeup();
}
public RequestFuture<ClientResponse> send(Node node,
ApiKeys api,
AbstractRequest request) {
long now = time.milliseconds();
//创建请求完成处理器
RequestFutureCompletionHandler future = new RequestFutureCompletionHandler();
//根据api得到下一个请求的请求头
RequestHeader header = client.nextRequestHeader(api);
//创建发送请求数据
RequestSend send = new RequestSend(node.idString(), header, request.toStruct());
//创建ClientRequest放入unsent缓冲区
put(node, new ClientRequest(now, true, send, future));
return future;
}
private void put(Node node, ClientRequest request) {
List<ClientRequest> nodeUnsent = unsent.get(node);
if (nodeUnsent == null) {
nodeUnsent = new ArrayList<>();
unsent.put(node, nodeUnsent);
}
nodeUnsent.add(request);
}
public static class RequestFutureCompletionHandler
extends RequestFuture<ClientResponse>
implements RequestCompletionHandler {
/**
* 处理客户端响应
* @param response
*/
@Override
public void onComplete(ClientResponse response) {
//如果断开连接
if (response.wasDisconnected()) {
//拿到请求
ClientRequest request = response.request();
//得到send请求得到请求头请求体
RequestSend send = request.request();
ApiKeys api = ApiKeys.forId(send.header().apiKey());
//拿到correlationId
int correlation = send.header().correlationId();
log.debug("Cancelled {} request {} with correlation id {} due to node {} being disconnected",
api, request, correlation, send.destination());
//抛出异常
raise(DisconnectException.INSTANCE);
} else {
complete(response);
}
}
}
//表示当前请求是否完成,不管正常完成还是出现异常,此字段都会被设置为true
private boolean isDone = false;
//记录请求正常完成时收到的响应,与exception字段互斥。此字段非空表示正常完成,反之表示出现异常。”
private T value;
//记录导致请求异常完成的异常类,与value值互斥。此字段非空则表示出现异常,反之则表示正常完成。
private RuntimeException exception;
//“RequestFutureListener集合,用来监听请求完成的情况。
private List<RequestFutureListener<T>> listeners = new ArrayList<>();