diff --git a/CHANGELOG.md b/CHANGELOG.md
index de7f7056..efe60210 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,10 @@
# Changelog
+- 4.2.0
+ - Optimize consumer fetch latency.
+ Introduced the `share_leader_conn` consumer configuration option (default: `false`).
+ This setting allows users to opt for the previous behavior if preferred (set to `true`).
+
- 4.1.1
- Upgrade `kafka_protocol` from version 4.1.5 to 4.1.9.
diff --git a/src/brod.erl b/src/brod.erl
index ee866a5c..d7dbedfb 100644
--- a/src/brod.erl
+++ b/src/brod.erl
@@ -262,6 +262,7 @@
| {offset_reset_policy, brod_consumer:offset_reset_policy()}
| {size_stat_window, non_neg_integer()}
| {isolation_level, brod_consumer:isolation_level()}
+ | {share_leader_conn, boolean()}
].
%% Consumer configuration.
%%
diff --git a/src/brod_client.erl b/src/brod_client.erl
index 8f4aad24..d52f1b38 100644
--- a/src/brod_client.erl
+++ b/src/brod_client.erl
@@ -1,5 +1,5 @@
-%%%
%%% Copyright (c) 2015-2021 Klarna Bank AB (publ)
+%%% Copyright (c) 2022-2024 kafka4beam contributors
%%%
%%% Licensed under the Apache License, Version 2.0 (the "License");
%%% you may not use this file except in compliance with the License.
@@ -30,6 +30,7 @@
, get_group_coordinator/2
, get_transactional_coordinator/2
, get_leader_connection/3
+ , get_bootstrap/1
, get_metadata/2
, get_metadata_safe/2
, get_partitions_count/2
@@ -227,6 +228,10 @@ stop_consumer(Client, TopicName) ->
get_leader_connection(Client, Topic, Partition) ->
safe_gen_call(Client, {get_leader_connection, Topic, Partition}, infinity).
+-spec get_bootstrap(client()) -> {ok, brod:bootstrap()} | {error, any()}.
+get_bootstrap(Client) ->
+ safe_gen_call(Client, get_bootstrap, infinity).
+
%% @doc Get connection to a kafka broker.
%%
%% Return already established connection towards the broker,
@@ -388,6 +393,10 @@ handle_call({stop_consumer, Topic}, _From, State) ->
handle_call({get_leader_connection, Topic, Partition}, _From, State) ->
{Result, NewState} = do_get_leader_connection(State, Topic, Partition),
{reply, Result, NewState};
+handle_call(get_bootstrap, _From, State) ->
+ #state{bootstrap_endpoints = Endpoints} = State,
+ ConnConfig = conn_config(State),
+ {reply, {ok, {Endpoints, ConnConfig}}, State};
handle_call({get_connection, Host, Port}, _From, State) ->
{Result, NewState} = maybe_connect(State, {Host, Port}),
{reply, Result, NewState};
diff --git a/src/brod_consumer.erl b/src/brod_consumer.erl
index b84e001f..a2647e69 100644
--- a/src/brod_consumer.erl
+++ b/src/brod_consumer.erl
@@ -1,4 +1,5 @@
%%% Copyright (c) 2014-2021 Klarna Bank AB (publ)
+%%% Copyright (c) 2022-2024 kafka4beam contributors
%%%
%%% Licensed under the Apache License, Version 2.0 (the "License");
%%% you may not use this file except in compliance with the License.
@@ -93,7 +94,10 @@
-type pending_acks() :: #pending_acks{}.
-type isolation_level() :: kpro:isolation_level().
--record(state, { bootstrap :: pid() | brod:bootstrap()
+-define(GET_FROM_CLIENT, get).
+-define(IGNORE, ignore).
+-record(state, { client_pid :: ?IGNORE | pid()
+ , bootstrap :: ?IGNORE | ?GET_FROM_CLIENT | brod:bootstrap()
, connection :: ?undef | pid()
, topic :: binary()
, partition :: integer()
@@ -136,6 +140,7 @@
-define(INIT_CONNECTION, init_connection).
-define(DEFAULT_AVG_WINDOW, 5).
-define(DEFAULT_ISOLATION_LEVEL, ?kpro_read_committed).
+-define(DEFAULT_SHARE_LEADER_CONN, false).
%%%_* APIs =====================================================================
%% @equiv start_link(ClientPid, Topic, Partition, Config, [])
@@ -220,6 +225,16 @@ start_link(Bootstrap, Topic, Partition, Config) ->
%% and `read_committed' to get only the records from committed
%% transactions
%%
+%%
`share_leader_conn': (optional, default = `false')
+%%
+%% Whether or not share the partition leader connection with
+%% other producers or consumers.
+%% Set to `true' to consume less TCP connections towards Kafka,
+%% but may lead to higher fetch latency. This is because Kafka can
+%% ony accumulate messages for the oldest fetch request, later
+%% requests behind it may get blocked until `max_wait_time' expires
+%% for the oldest one
+%%
%%
%% @end
-spec start_link(pid() | brod:bootstrap(),
@@ -286,7 +301,7 @@ get_connection(Pid) ->
%%%_* gen_server callbacks =====================================================
-init({Bootstrap, Topic, Partition, Config}) ->
+init({Bootstrap0, Topic, Partition, Config}) ->
erlang:process_flag(trap_exit, true),
Cfg = fun(Name, Default) ->
proplists:get_value(Name, Config, Default)
@@ -300,15 +315,33 @@ init({Bootstrap, Topic, Partition, Config}) ->
BeginOffset = Cfg(begin_offset, ?DEFAULT_BEGIN_OFFSET),
OffsetResetPolicy = Cfg(offset_reset_policy, ?DEFAULT_OFFSET_RESET_POLICY),
IsolationLevel = Cfg(isolation_level, ?DEFAULT_ISOLATION_LEVEL),
-
- %% If bootstrap is a client pid, register self to the client
- case is_shared_conn(Bootstrap) of
+ IsShareConn = Cfg(share_leader_conn, ?DEFAULT_SHARE_LEADER_CONN),
+
+ %% resolve connection bootstrap args
+ {ClientPid, Bootstrap} =
+ case is_pid(Bootstrap0) of
+ true when IsShareConn ->
+ %% share leader connection with other producers/consumers
+ %% the connection is to be managed by brod_client
+ {Bootstrap0, ?IGNORE};
+ true ->
+ %% not sharing leader connection with other producers/consumers
+ %% the bootstrap args will be resolved later when it's
+ %% time to establish a connection to partition leader
+ {Bootstrap0, ?GET_FROM_CLIENT};
+ false ->
+ %% this consumer process is not started from `brod' APIs
+ %% maybe managed by other supervisors.
+ {?IGNORE, Bootstrap0}
+ end,
+ case is_pid(ClientPid) of
true ->
ok = brod_client:register_consumer(Bootstrap, Topic, Partition);
false ->
ok
end,
- {ok, #state{ bootstrap = Bootstrap
+ {ok, #state{ client_pid = ClientPid
+ , bootstrap = Bootstrap
, topic = Topic
, partition = Partition
, begin_offset = BeginOffset
@@ -418,20 +451,26 @@ handle_cast(Cast, State) ->
{noreply, State}.
%% @private
-terminate(Reason, #state{ bootstrap = Bootstrap
+terminate(Reason, #state{ client_pid = ClientPid
, topic = Topic
, partition = Partition
, connection = Connection
+ , connection_mref = Mref
}) ->
- IsShared = is_shared_conn(Bootstrap),
IsNormal = brod_utils:is_normal_reason(Reason),
%% deregister consumer if it's shared connection and normal shutdown
- IsShared andalso IsNormal andalso
- brod_client:deregister_consumer(Bootstrap, Topic, Partition),
- %% close connection if it's working standalone
- case not IsShared andalso is_pid(Connection) of
- true -> kpro:close_connection(Connection);
- false -> ok
+ case is_pid(ClientPid) andalso IsNormal of
+ true ->
+ brod_client:deregister_consumer(ClientPid, Topic, Partition);
+ false ->
+ ok
+ end,
+ %% close connection if it's owned by this consumer
+ case Mref =:= ?undef andalso is_pid(Connection) andalso is_process_alive(Connection) of
+ true ->
+ kpro:close_connection(Connection);
+ false ->
+ ok
end,
%% write a log if it's not a normal reason
IsNormal orelse ?BROD_LOG_ERROR("Consumer ~s-~w terminate reason: ~p",
@@ -858,17 +897,19 @@ safe_gen_call(Server, Call, Timeout) ->
-spec maybe_init_connection(state()) ->
{ok, state()} | {{error, any()}, state()}.
maybe_init_connection(
- #state{ bootstrap = Bootstrap
+ #state{ client_pid = ClientPid
+ , bootstrap = Bootstrap
, topic = Topic
, partition = Partition
, connection = ?undef
} = State0) ->
%% Lookup, or maybe (re-)establish a connection to partition leader
- case connect_leader(Bootstrap, Topic, Partition) of
+ {MonitorOrLink, Result} = connect_leader(ClientPid, Bootstrap, Topic, Partition),
+ case Result of
{ok, Connection} ->
- Mref = case is_shared_conn(Bootstrap) of
- true -> erlang:monitor(process, Connection);
- false -> ?undef %% linked
+ Mref = case MonitorOrLink of
+ monitor -> erlang:monitor(process, Connection);
+ linked -> ?undef
end,
%% Switching to a new connection
%% the response for last_req_ref will be lost forever
@@ -883,13 +924,23 @@ maybe_init_connection(
maybe_init_connection(State) ->
{ok, State}.
-connect_leader(ClientPid, Topic, Partition) when is_pid(ClientPid) ->
- brod_client:get_leader_connection(ClientPid, Topic, Partition);
-connect_leader(Endpoints, Topic, Partition) when is_list(Endpoints) ->
- connect_leader({Endpoints, []}, Topic, Partition);
-connect_leader({Endpoints, ConnCfg}, Topic, Partition) ->
+connect_leader(ClientPid, ?IGNORE, Topic, Partition) when is_pid(ClientPid) ->
+ {monitor, brod_client:get_leader_connection(ClientPid, Topic, Partition)};
+connect_leader(ClientPid, ?GET_FROM_CLIENT, Topic, Partition) when is_pid(ClientPid) ->
+ case brod_client:get_bootstrap(ClientPid) of
+ {ok, Bootstrap} ->
+ link_connect_leader(Bootstrap, Topic, Partition);
+ {error, Reason} ->
+ {linked, {error, Reason}}
+ end;
+connect_leader(?IGNORE, Bootstrap, Topic, Partition) ->
+ link_connect_leader(Bootstrap, Topic, Partition).
+
+link_connect_leader(Endpoints, Topic, Partition) when is_list(Endpoints) ->
+ link_connect_leader({Endpoints, []}, Topic, Partition);
+link_connect_leader({Endpoints, ConnCfg}, Topic, Partition) ->
%% connection pid is linked to self()
- kpro:connect_partition_leader(Endpoints, ConnCfg, Topic, Partition).
+ {linked, kpro:connect_partition_leader(Endpoints, ConnCfg, Topic, Partition)}.
%% Send a ?INIT_CONNECTION delayed loopback message to re-init.
-spec maybe_send_init_connection(state()) -> ok.
@@ -900,9 +951,6 @@ maybe_send_init_connection(#state{subscriber = Subscriber}) ->
erlang:send_after(Timeout, self(), ?INIT_CONNECTION),
ok.
-%% In case 'bootstrap' is a client pid, connection is shared with other workers.
-is_shared_conn(Bootstrap) -> is_pid(Bootstrap).
-
%%%_* Tests ====================================================================
-ifdef(TEST).
diff --git a/test/brod_consumer_SUITE.erl b/test/brod_consumer_SUITE.erl
index 77e602c0..f8e72262 100644
--- a/test/brod_consumer_SUITE.erl
+++ b/test/brod_consumer_SUITE.erl
@@ -37,7 +37,8 @@
, t_direct_fetch_expand_max_bytes/1
, t_resolve_offset/1
, t_consumer_max_bytes_too_small/1
- , t_consumer_connection_restart/1
+ , t_consumer_connection_restart_0/1
+ , t_consumer_connection_restart_1/1
, t_consumer_connection_restart_2/1
, t_consumer_resubscribe/1
, t_consumer_resubscribe_earliest/1
@@ -536,8 +537,20 @@ t_consumer_max_bytes_too_small(Config) ->
end).
%% @doc Consumer should auto recover from connection down, subscriber should not
-%% notice a thing except for a few seconds of break in data streaming
-t_consumer_connection_restart(Config) ->
+%% notice a thing except for a few seconds of break in data streaming.
+%% Covers the case when connection is shared with other partition leaders
+t_consumer_connection_restart_0(Config) ->
+ ConsumerConfig = [{share_leader_conn, true} | consumer_config()],
+ consumer_connection_restart(Config, ConsumerConfig).
+
+%% @doc Consumer should auto recover from connection down, subscriber should not
+%% notice a thing except for a few seconds of break in data streaming.
+%% Covers the case when connection is NOT shared with other partition leaders
+t_consumer_connection_restart_1(Config) ->
+ ConsumerConfig = [{share_leader_conn, false} | consumer_config()],
+ consumer_connection_restart(Config, ConsumerConfig).
+
+consumer_connection_restart(Config, ConsumerConfig) ->
Client = ?config(client),
Topic = ?TOPIC,
Partition = 0,
@@ -546,7 +559,7 @@ t_consumer_connection_restart(Config) ->
, {prefetch_bytes, 0}
, {min_bytes, 1}
, {max_bytes, 12} %% ensure fetch exactly one message at a time
- | consumer_config()
+ | ConsumerConfig
],
{ok, ConsumerPid} =
brod_consumer:start_link(whereis(Client), Topic, Partition, ConsumerCfg),
@@ -586,11 +599,31 @@ t_consumer_connection_restart(Config) ->
Nums2 = Receive(Nums1, 5000),
?assertError(timeout, Receive(Nums2, 100)),
?assertEqual(NumsCnt - 2, length(Nums2)),
- ?assertEqual({ok, NewConnPid},
- brod_client:get_leader_connection(Client, Topic, Partition)),
- ok = brod_consumer:stop(ConsumerPid),
- ?assertNot(is_process_alive(ConsumerPid)),
- ?assert(is_process_alive(NewConnPid)), %% managed by brod_client
+ case proplists:get_bool(share_leader_conn, ConsumerConfig) of
+ true ->
+ ?assertEqual({ok, NewConnPid},
+ brod_client:get_leader_connection(Client, Topic, Partition)),
+ ok = brod_consumer:stop(ConsumerPid),
+ ?assertNot(is_process_alive(ConsumerPid)),
+ ?assert(is_process_alive(NewConnPid));
+ false ->
+ %% assert normal shutdown
+ Ref1 = erlang:monitor(process, NewConnPid),
+ Ref2 = erlang:monitor(process, ConsumerPid),
+ %% assert connection linked to consumer
+ {links, Links} = process_info(ConsumerPid, links),
+ ?assert(lists:member(NewConnPid, Links)),
+ ok = brod_consumer:stop(ConsumerPid),
+ Wait = fun() ->
+ ?WAIT_ONLY({'DOWN', Ref, process, _, Reason},
+ begin
+ ?assertEqual(normal, Reason),
+ ?assert(Ref =:= Ref1 orelse Ref =:= Ref2)
+ end)
+ end,
+ Wait(),
+ Wait()
+ end,
ok.
%% @doc same as t_consumer_connection_restart,