From bf928601f818d08739720aac39a16dc17f60a9f8 Mon Sep 17 00:00:00 2001 From: Laur Sisask Date: Wed, 8 May 2024 13:28:04 +0300 Subject: [PATCH 1/8] Fix coordinator process exiting due to heartbeat race Brod group coordinator periodically sends heartbeats to the Kafka broker. If it does not receive a response to a request within configured timeout, it exits with `hb_timeout` reason. There was a race condition where the connection to the Kafka broker was closed after a heartbeat was sent out, but before a heartbeat response was received. When this happened, brod still expected to receive a response to the heartbeat. But since the connection had closed, this response never came and the process exited with `hb_timeout`. This error consistently happens once in an hour in all our Elixir deployments that use brod. It looks like that for some reason Amazon MSK closes the Kafka connection from the broker side every 1 hour, and for some reason always after the client sends a heartbeat request. I do not know why this happens, but regardless, the server has a right to close the connection and the application should be able to handle that without causing error noise. This commit fixes the race condition. Now, when the connection goes down, we remove the reference to the heartbeat request that was last sent out. By removing this reference, the coordinator will no longer expect a response to the heartbeat request. Should connection be re-established, the coordinator will start sending out new heartbeat requests as usual. I tested out the solution in my own computer by adding a custom TCP proxy in front of Kafka where I had ability to terminate the connections and introduce additional latency. With this setup, I was able to verify that with the previous version, the same errors that we saw in production happened, but with the changes they no longer showed up. These are the errors that showed up in our logs: ``` Process #PID<0.19777.11> terminating ** (exit) :hb_timeout (stdlib 4.2) gen_server.erl:1241: :gen_server.handle_common_reply/8 (stdlib 4.2) proc_lib.erl:240: :proc_lib.init_p_do_apply/3 Initial Call: :brod_group_coordinator.init/1 Ancestors: [#PID<0.19775.11>, CallRouter.Supervisor, #PID<0.4065.0>] Neighbours: #PID<0.6845.12> Initial Call: :kpro_connection.init/4 Current Call: :kpro_connection.loop/2 Ancestors: [#PID<0.19777.11>, #PID<0.19775.11>, CallRouter.Supervisor, #PID<0.4065.0>] ``` ``` GenServer #PID<0.1262.11> terminating ** (stop) :hb_timeout Last message: :lo_cmd_send_heartbeat ``` XT-19 --- src/brod_group_coordinator.erl | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/src/brod_group_coordinator.erl b/src/brod_group_coordinator.erl index 5ae1ac9c..50884d1a 100644 --- a/src/brod_group_coordinator.erl +++ b/src/brod_group_coordinator.erl @@ -526,7 +526,11 @@ stabilize(#state{ rejoin_delay_seconds = RejoinDelaySeconds State3 = State2#state{is_in_group = false}, %$ 4. Clean up state based on the last failure reason - State = maybe_reset_member_id(State3, Reason), + State4 = maybe_reset_member_id(State3, Reason), + + %% 5. Clean up ongoing heartbeat request ref if connection + %% was closed + State = maybe_reset_hb_ref(State4, Reason), %% 5. ensure we have a connection to the (maybe new) group coordinator F1 = fun discover_coordinator/1, @@ -591,6 +595,15 @@ should_reset_member_id({connection_down, _Reason}) -> should_reset_member_id(_) -> false. +%% When connection goes down while waiting for heartbeat +%% response, the response will never be received. +%% Reset heartbeat ref to let new heartbeat request to +%% be sent over new connection. +maybe_reset_hb_ref(State, {connection_down, _Reason}) -> + State#state{hb_ref = ?undef}; +maybe_reset_hb_ref(State, _) -> + State. + -spec join_group(state()) -> {ok, state()}. join_group(#state{ groupId = GroupId , memberId = MemberId0 From c34fa870f00719fee6bc4cbff04f12121f08be6d Mon Sep 17 00:00:00 2001 From: zmstone Date: Thu, 16 May 2024 12:09:31 +0200 Subject: [PATCH 2/8] docs: update changelog prepare for 3.18.0 release --- CHANGELOG.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6707586a..1feefd57 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog +- 3.18.0 + - Add transactional APIs. [PR#549](https://github.com/kafka4beam/brod/pull/549) + - Fix unnecessary group coordinator restart due to `hb_timeout` exception. [PR#578](https://github.com/kafka4beam/brod/pull/578) + - Changed supervisor3 progress log level from `info` to `debug`. [PR#572](https://github.com/kafka4beam/brod/pull/572) + - Type spec fix. [PR#571](https://github.com/kafka4beam/brod/pull/571) + - Remove unused macro. [PR#575](https://github.com/kafka4beam/brod/pull/575) + - 3.17.1 - Upgrade `kafka_protocol` from 4.1.3 to 4.1.5 - Allow space after `,` in comma-separated bootstrapping host:port list From f35d281bb93de16023a5d8a3397f392471fa0a33 Mon Sep 17 00:00:00 2001 From: urmastalimaa Date: Mon, 27 May 2024 23:33:37 +0300 Subject: [PATCH 3/8] Forward unhandled messages to optional subscriber handle_info callback The motivation for adding handle_info callbacks is to allow subscriber worker processes which are spawned by brod to participate in message passing, supporting a variety of use cases utilizing async acking and committing. An example use case: * Start a group subscriber using `brod_group_subscriber_v2` * In a partition worker spawn a new process for every message under a supervisor specific to the worker's topic-partition * When the supervisor has <= N processes, ack last seen offset to fetch new messages. When the supervisor has > N processes, messages are not acked to apply backpressure * When all processes up to offset O1 have completed, commit offset O1 Allowing arbitrary message passing in the topic and group subscriber workers supports not only that use case but many others. --- src/brod_group_subscriber_worker.erl | 11 +++++- src/brod_topic_subscriber.erl | 17 +++++++-- test/brod_topic_subscriber_SUITE.erl | 55 ++++++++++++++++++++++++++++ 3 files changed, 79 insertions(+), 4 deletions(-) diff --git a/src/brod_group_subscriber_worker.erl b/src/brod_group_subscriber_worker.erl index 4f0c40ae..7dd69f9f 100644 --- a/src/brod_group_subscriber_worker.erl +++ b/src/brod_group_subscriber_worker.erl @@ -22,7 +22,7 @@ -include("brod_int.hrl"). %% brod_topic_subscriber callbacks --export([init/2, handle_message/3, terminate/2]). +-export([init/2, handle_message/3, handle_info/2, terminate/2]). -type start_options() :: #{ group_id := brod:group_id() @@ -91,6 +91,15 @@ handle_message(_Partition, Msg, State) -> {ok, NewState} end. +handle_info(Info, #state{cb_module = CbModule , cb_state = CbState} = State) -> + %% Any unhandled messages are forwarded to the callback module to + %% support arbitrary message-passing. + %% Only the {noreply, State} return value is supported. + case brod_utils:optional_callback(CbModule, handle_info, [Info, CbState], {noreply, CbState}) of + {noreply, NewCbState} -> + {noreply, State#state{cb_state = NewCbState}} + end. + terminate(Reason, #state{cb_module = CbModule, cb_state = State}) -> brod_utils:optional_callback(CbModule, terminate, [Reason, State], ok). diff --git a/src/brod_topic_subscriber.erl b/src/brod_topic_subscriber.erl index 08f6512d..9b8add98 100644 --- a/src/brod_topic_subscriber.erl +++ b/src/brod_topic_subscriber.erl @@ -108,7 +108,12 @@ %% This callback is called before stopping the subscriber -callback terminate(_Reason, cb_state()) -> _. --optional_callbacks([terminate/2]). +%% This callback is called when the subscriber receives a message unrelated to +%% the subscription. +%% The callback must return `{noreply, NewCallbackState}'. +-callback handle_info(_Msg, cb_state()) -> {noreply, cb_state()}. + +-optional_callbacks([terminate/2, handle_info/2]). %%%_* Types and macros ========================================================= @@ -357,8 +362,14 @@ handle_info({'DOWN', _Mref, process, Pid, Reason}, %% not a consumer pid {noreply, State} end; -handle_info(_Info, State) -> - {noreply, State}. +handle_info(Info, #state{cb_module = CbModule, cb_state = CbState} = State) -> + %% Any unhandled messages are forwarded to the callback module to + %% support arbitrary message-passing. + %% Only the {noreply, State} return value is supported. + case brod_utils:optional_callback(CbModule, handle_info, [Info, CbState], {noreply, CbState}) of + {noreply, NewCbState} -> + {noreply, State#state{cb_state = NewCbState}} + end. %% @private handle_call(Call, _From, State) -> diff --git a/test/brod_topic_subscriber_SUITE.erl b/test/brod_topic_subscriber_SUITE.erl index 9c022a9d..49f72e91 100644 --- a/test/brod_topic_subscriber_SUITE.erl +++ b/test/brod_topic_subscriber_SUITE.erl @@ -30,6 +30,7 @@ -export([ init/2 , terminate/2 , handle_message/3 + , handle_info/2 ]). %% Test cases @@ -40,6 +41,7 @@ , t_callback_crash/1 , t_begin_offset/1 , t_cb_fun/1 + , t_consumer_ack_via_message_passing/1 ]). -include("brod_test_setup.hrl"). @@ -107,6 +109,21 @@ handle_message(Partition, Message, #state{ is_async_ack = IsAsyncAck false -> {ok, ack, State} end. +handle_info({ack_offset, Partition, Offset} = Msg, #state{ counter = Counter + , worker_id = Ref + } = State0) -> + %% Participate in state continuity checks + ?tp(topic_subscriber_seen_info, + #{ partition => Partition + , offset => Offset + , msg => Msg + , state => Counter + , worker_id => Ref + }), + State = State0#state{counter = Counter + 1}, + ok = brod_topic_subscriber:ack(self(), Partition, Offset), + {noreply, State}. + terminate(Reason, #state{worker_id = Ref, counter = Counter}) -> ?tp(topic_subscriber_terminate, #{ worker_id => Ref @@ -184,6 +201,44 @@ t_async_acks(Config) when is_list(Config) -> check_init_terminate(Trace) end). +t_consumer_ack_via_message_passing(Config) when is_list(Config) -> + %% Process messages one by one with no prefetch + ConsumerConfig = [ {prefetch_count, 0} + , {prefetch_bytes, 0} + , {sleep_timeout, 0} + , {max_bytes, 0} + ], + Partition = 0, + SendFun = + fun(I) -> + produce({?topic, Partition}, <>) + end, + ?check_trace( + %% Run stage: + begin + O0 = SendFun(0), + %% Send two messages + Offset0 = SendFun(1), + _Offset1 = SendFun(2), + InitArgs = {_IsAsyncAck = true, + _ConsumerOffsets = [{0, O0}]}, + {ok, SubscriberPid} = + brod:start_link_topic_subscriber(?CLIENT_ID, ?topic, ConsumerConfig, + ?MODULE, InitArgs), + {ok, _} = wait_message(<<1>>), + %% ack_offset allows consumer to proceed to message 2 + SubscriberPid ! {ack_offset, 0, Offset0}, + {ok, _} = wait_message(<<2>>), + ok = brod_topic_subscriber:stop(SubscriberPid), + _Expected = [<<1>>, <<2>>] + end, + %% Check stage: + fun(Expected, Trace) -> + check_received_messages(Expected, Trace), + check_state_continuity(Trace), + check_init_terminate(Trace) + end). + t_begin_offset(Config) when is_list(Config) -> ConsumerConfig = [ {prefetch_count, 100} , {prefetch_bytes, 0} %% as discard From f32caafc49e6582e6a3c0c05676d2fb4bd5fb13d Mon Sep 17 00:00:00 2001 From: urmastalimaa Date: Fri, 7 Jun 2024 17:35:27 +0300 Subject: [PATCH 4/8] Add changelog for version 3.19.0 --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1feefd57..820e4874 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +- 3.19.0 + - Forward unhandled messages in topic/group consumer processes to handle_info/2 callbacks + in order to support arbitrary message passing [PR#580](https://github.com/kafka4beam/brod/pull/580) + - 3.18.0 - Add transactional APIs. [PR#549](https://github.com/kafka4beam/brod/pull/549) - Fix unnecessary group coordinator restart due to `hb_timeout` exception. [PR#578](https://github.com/kafka4beam/brod/pull/578) From 5e63808e5de170d31f7383778db3b78d3c4aaac9 Mon Sep 17 00:00:00 2001 From: zmstone Date: Sat, 15 Jun 2024 12:36:29 +0200 Subject: [PATCH 5/8] Fix CI --- .gitignore | 3 +- CHANGELOG.md | 7 ++- guides/examples/Authentication.md | 1 + scripts/docker-compose.yml | 28 ++---------- scripts/setup-test-env.sh | 2 +- src/brod_cli.erl | 43 +++++++++++++++---- test/brod_cli_tests.erl | 16 ++++--- test/brod_client_SUITE.erl | 12 +++--- priv/ssl/ca.crt => test/data/ssl/ca.pem | 0 .../data/ssl/client-crt.pem | 0 .../data/ssl/client-key.pem | 0 11 files changed, 65 insertions(+), 47 deletions(-) rename priv/ssl/ca.crt => test/data/ssl/ca.pem (100%) rename priv/ssl/client.crt => test/data/ssl/client-crt.pem (100%) rename priv/ssl/client.key => test/data/ssl/client-key.pem (100%) diff --git a/.gitignore b/.gitignore index 516ec934..68fece43 100644 --- a/.gitignore +++ b/.gitignore @@ -23,4 +23,5 @@ _rel/ *.log relx docker/ -TAGS \ No newline at end of file +TAGS +.vscode/ diff --git a/CHANGELOG.md b/CHANGELOG.md index 820e4874..c73416cf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,13 @@ # Changelog +- 3.19.1 + - Made brod-cli to work on OTP 26. [PR#582](https://github.com/kafka4beam/brod/pull/582) + - `--ssl` option is now mandatory if TLS is to be used (previously it can be derived from `--cacertfile` option) + - TLS version defaults to 1.2, added `--ssl-versions` to support explictly setting TLS 1.3 + - 3.19.0 - Forward unhandled messages in topic/group consumer processes to handle_info/2 callbacks - in order to support arbitrary message passing [PR#580](https://github.com/kafka4beam/brod/pull/580) + in order to support arbitrary message passing [PR#580](https://github.com/kafka4beam/brod/pull/580) - 3.18.0 - Add transactional APIs. [PR#549](https://github.com/kafka4beam/brod/pull/549) diff --git a/guides/examples/Authentication.md b/guides/examples/Authentication.md index e593514b..a3065655 100644 --- a/guides/examples/Authentication.md +++ b/guides/examples/Authentication.md @@ -60,6 +60,7 @@ For more info see the Erlang Ecosystem Foundation's [server certificate verifica , { depth, 3 } , { customize_hostname_check, [{match_fun, public_key:pkix_verify_hostname_match_fun(https)}]} + , {version, ['tlsv1.3', 'tlsv1.2']} ]} , { sasl, {plain, "GFRW5BSQHKEH0TSG", "GrL3CNTkLhsvtBr8srGn0VilMpgDb4lPD"}} ] diff --git a/scripts/docker-compose.yml b/scripts/docker-compose.yml index 272f0b4f..34d28561 100644 --- a/scripts/docker-compose.yml +++ b/scripts/docker-compose.yml @@ -1,35 +1,17 @@ version: "2" services: - pause: - image: "gcr.io/google_containers/pause-amd64:3.0" - networks: - - pausenet - ports: - - "2181:2181" - - "9092:9092" - - "9093:9093" - - "9094:9094" - - "9095:9095" - - "9192:9192" - - "9193:9193" - - "9194:9194" - - "9195:9195" - container_name: pause zookeeper: - depends_on: - - pause image: "zmstone/kafka:${KAFKA_VERSION}" container_name: zookeeper command: run zookeeper - network_mode: service:pause + network_mode: host kafka_1: depends_on: - - pause - zookeeper image: "zmstone/kafka:${KAFKA_VERSION}" container_name: "kafka-1" - network_mode: service:pause + network_mode: host environment: BROKER_ID: 0 PLAINTEXT_PORT: 9092 @@ -40,11 +22,10 @@ services: ZOOKEEPER_CONNECT: "localhost:2181" kafka_2: depends_on: - - pause - zookeeper image: "zmstone/kafka:${KAFKA_VERSION}" container_name: "kafka-2" - network_mode: service:pause + network_mode: host environment: BROKER_ID: 1 PLAINTEXT_PORT: 9192 @@ -53,6 +34,3 @@ services: SASL_PLAINTEXT_PORT: 9195 ADVERTISED_HOSTNAME: localhost ZOOKEEPER_CONNECT: "localhost:2181" - -networks: - pausenet: diff --git a/scripts/setup-test-env.sh b/scripts/setup-test-env.sh index b39fe63e..b877ef2f 100755 --- a/scripts/setup-test-env.sh +++ b/scripts/setup-test-env.sh @@ -18,7 +18,7 @@ function docker_compose { fi } -VERSION=${KAFKA_VERSION:-1.1} +VERSION=${KAFKA_VERSION:-2.4} if [ -z $VERSION ]; then VERSION=$1; fi case $VERSION in diff --git a/src/brod_cli.erl b/src/brod_cli.erl index b28b6302..e3f3cef8 100644 --- a/src/brod_cli.erl +++ b/src/brod_cli.erl @@ -61,6 +61,8 @@ commands: %% NOTE: bad indentation at the first line is intended -define(COMMAND_COMMON_OPTIONS, " --ssl Use TLS, validate server using trusted CAs + --ssl-versions= Specify SSL versions. Comma separated versions, + e.g. 1.3,1.2 --cacertfile= Use TLS, validate server using the given certificate --certfile= Client certificate in case client authentication is enabled in brokers @@ -365,6 +367,7 @@ main(Command, Doc, Args, Stop, LogLevel) -> C1 : E1 ?BIND_STACKTRACE(Stack1) -> ?GET_STACKTRACE(Stack1), verbose("~p:~p\n~p\n", [C1, E1, Stack1]), + io:format(user, "~p~n", [{C1, E1, Stack1}]), ?STOP(Stop) end, case LogLevel =:= ?LOG_LEVEL_QUIET of @@ -1125,20 +1128,25 @@ parse_offset_time(T) -> int(T). parse_connection_config(Args) -> SslBool = parse(Args, "--ssl", fun parse_boolean/1), + SslVersions = parse(Args, "--ssl-versions", fun parse_ssl_versions/1), CaCertFile = parse(Args, "--cacertfile", fun parse_file/1), CertFile = parse(Args, "--certfile", fun parse_file/1), KeyFile = parse(Args, "--keyfile", fun parse_file/1), FilterPred = fun({_, V}) -> V =/= ?undef end, SslOpt = - case CaCertFile of - ?undef -> - SslBool; - _ -> - Files = + case SslBool of + true -> + Opts = [{cacertfile, CaCertFile}, {certfile, CertFile}, - {keyfile, KeyFile}], - lists:filter(FilterPred, Files) + {keyfile, KeyFile}, + {versions, SslVersions}, + %% TODO: verify_peer if cacertfile is provided + {verify, verify_none} + ], + lists:filter(FilterPred, Opts); + false -> + false end, SaslPlain = parse(Args, "--sasl-plain", fun parse_file/1), SaslScram256 = parse(Args, "--scram256", fun parse_file/1), @@ -1157,12 +1165,31 @@ parse_boolean(true) -> true; parse_boolean(false) -> false; parse_boolean("true") -> true; parse_boolean("false") -> false; -parse_boolean(?undef) -> ?undef. +parse_boolean(?undef) -> false. parse_cg_ids("") -> []; parse_cg_ids("all") -> all; parse_cg_ids(Str) -> [bin(I) || I <- string:tokens(Str, ",")]. +parse_ssl_versions(?undef) -> + parse_ssl_versions(""); +parse_ssl_versions(Versions) -> + case lists:map(fun parse_ssl_version/1, string:tokens(Versions, ", ")) of + [] -> + ['tlsv1.2']; + Vsns -> + Vsns + end. + +parse_ssl_version("1.2") -> + 'tlsv1.2'; +parse_ssl_version("1.3") -> + 'tlsv1.3'; +parse_ssl_version("1.1") -> + 'tlsv1.1'; +parse_ssl_version(Other) -> + error({unsupported_tls_version, Other}). + parse_file(?undef) -> ?undef; parse_file(Path) -> diff --git a/test/brod_cli_tests.erl b/test/brod_cli_tests.erl index a9615283..4d382fb2 100644 --- a/test/brod_cli_tests.erl +++ b/test/brod_cli_tests.erl @@ -37,9 +37,12 @@ meta_test() -> ssl_test() -> run(["meta", "-b", "localhost:9093", "-L", - "--cacertfile", "priv/ssl/ca.crt", - "--keyfile", "priv/ssl/client.key", - "--certfile", "priv/ssl/client.crt"]). + "--ssl", + "--cacertfile", "test/data/ssl/ca.pem", + "--keyfile", "test/data/ssl/client-key.pem", + "--certfile", "test/data/ssl/client-crt.pem", + "--ssl-versions", "1.2,1.1" + ]). offset_test() -> Args = ["offset", "-b", "localhost", "-t", "test-topic", "-p", "0"], @@ -74,9 +77,10 @@ test_sasl() -> Output = cmd(["send", "--brokers", "localhost:9194,localhost:9094", "-t", "test-topic", "-p", "0", - "--cacertfile", "priv/ssl/ca.crt", - "--keyfile", "priv/ssl/client.key", - "--certfile", "priv/ssl/client.crt", + "--ssl", + "--cacertfile", "test/data/ssl/ca.pem", + "--keyfile", "test/data/ssl/client-key.pem", + "--certfile", "test/data/ssl/client-crt.pem", "--sasl-plain", "sasl.testdata", "-k", K, "-v", V]), ?assertEqual(<<"">>, Output), diff --git a/test/brod_client_SUITE.erl b/test/brod_client_SUITE.erl index dfef00aa..eff92257 100644 --- a/test/brod_client_SUITE.erl +++ b/test/brod_client_SUITE.erl @@ -385,11 +385,13 @@ t_magic_version(Config) when is_list(Config) -> auth(_Host, _Sock, _Mod, _ClientId, _Timeout, _Opts) -> ok. ssl_options() -> - PrivDir = code:priv_dir(brod), - Fname = fun(Name) -> filename:join([PrivDir, ssl, Name]) end, - [ {cacertfile, Fname("ca.crt")} - , {keyfile, Fname("client.key")} - , {certfile, Fname("client.crt")} + LibDir = code:lib_dir(brod), + Fname = fun(Name) -> filename:join([LibDir, test, data, ssl, Name]) end, + [ {cacertfile, Fname("ca.pem")} + , {keyfile, Fname("client-key.pem")} + , {certfile, Fname("client-crt.pem")} + , {versions, ['tlsv1.2']} + , {verify, verify_none} ]. produce_and_consume_message(Host, Client, ClientConfig) -> diff --git a/priv/ssl/ca.crt b/test/data/ssl/ca.pem similarity index 100% rename from priv/ssl/ca.crt rename to test/data/ssl/ca.pem diff --git a/priv/ssl/client.crt b/test/data/ssl/client-crt.pem similarity index 100% rename from priv/ssl/client.crt rename to test/data/ssl/client-crt.pem diff --git a/priv/ssl/client.key b/test/data/ssl/client-key.pem similarity index 100% rename from priv/ssl/client.key rename to test/data/ssl/client-key.pem From 41dbeda82378d54a5d8d89ceaad0f8266acf79e5 Mon Sep 17 00:00:00 2001 From: zmstone Date: Sun, 16 Jun 2024 11:12:39 +0200 Subject: [PATCH 6/8] test: add Kafka 3.6 to CI --- .github/workflows/build.yml | 11 +-- .gitignore | 1 + Makefile | 6 ++ rebar.config | 8 +- scripts/docker-compose.yml | 6 +- scripts/setup-test-env.sh | 77 +++++++++++++----- src/brod_consumer.erl | 31 +++++--- src/brod_utils.erl | 21 ++++- test/brod_SUITE.erl | 9 ++- test/brod_cli_tests.erl | 8 +- test/brod_compression_SUITE.erl | 5 +- test/brod_consumer_SUITE.erl | 100 +++++++++++++++--------- test/brod_demo_group_subscriber_koc.erl | 5 +- test/brod_demo_group_subscriber_loc.erl | 5 +- test/brod_demo_topic_subscriber.erl | 5 +- test/brod_group_coordinator_SUITE.erl | 5 +- test/brod_offset_txn_SUITE.erl | 18 +++-- test/brod_producer_SUITE.erl | 5 +- test/brod_topic_subscriber_SUITE.erl | 2 +- test/brod_txn_SUITE.erl | 14 ++-- test/brod_txn_processor_SUITE.erl | 15 ++-- test/data/ssl/README.md | 3 + test/data/ssl/ca.pem | 22 ------ test/data/ssl/client-crt.pem | 20 ----- test/data/ssl/client-key.pem | 28 ------- test/kafka_test_helper.erl | 32 ++++++-- 26 files changed, 253 insertions(+), 209 deletions(-) create mode 100644 test/data/ssl/README.md delete mode 100644 test/data/ssl/ca.pem delete mode 100644 test/data/ssl/client-crt.pem delete mode 100644 test/data/ssl/client-key.pem diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 1ab8ffd0..209da873 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -7,8 +7,8 @@ on: branches: - master env: - OTP_VERSION: "24.1" - REBAR_VERSION: "3.17.0" + OTP_VERSION: "26" + REBAR_VERSION: "3.20.0" jobs: lint: @@ -44,8 +44,8 @@ jobs: strategy: fail-fast: false matrix: - otp: ["24.1", "23.3.4.7", "22.3.4.21"] - kafka: ["2.4", "1.1", "0.11"] + otp: ["26"] + kafka: ["0.9", "0.10", "0.11", "2.8", "1.1", "3.6"] steps: - name: Checkout uses: actions/checkout@v2 @@ -69,7 +69,8 @@ jobs: run: | export KAFKA_VERSION=${{ matrix.kafka }} echo "Running Kafka ${KAFKA_VERSION}" - scripts/setup-test-env.sh && rebar3 do ct,eunit + make test-env + make t - name: Store test logs uses: actions/upload-artifact@v1 if: always() diff --git a/.gitignore b/.gitignore index 68fece43..0e07c25f 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,4 @@ relx docker/ TAGS .vscode/ +test/data/ssl/*.pem diff --git a/Makefile b/Makefile index 426e39a2..7b7d6db9 100644 --- a/Makefile +++ b/Makefile @@ -1,3 +1,5 @@ +KAFKA_VERSION ?= 3.6 +export KAFKA_VERSION all: compile compile: @@ -8,6 +10,10 @@ lint: test-env: @./scripts/setup-test-env.sh + @mkdir -p ./test/data/ssl + @docker cp kafka-1:/localhost-ca-crt.pem ./test/data/ssl/ca.pem + @docker cp kafka-1:/localhost-client-key.pem ./test/data/ssl/client-key.pem + @docker cp kafka-1:/localhost-client-crt.pem ./test/data/ssl/client-crt.pem ut: @rebar3 eunit -v --cover_export_name ut-$(KAFKA_VERSION) diff --git a/rebar.config b/rebar.config index 4637d1e4..d6a68551 100644 --- a/rebar.config +++ b/rebar.config @@ -17,9 +17,9 @@ {relx, [{release, {brod, "i"}, % release the interactive shell as brod-i [brod, jsone, docopt]}, {include_erts, true}, - {overlay, [{copy, "scripts/brod", "bin"}, - {copy, "{{lib_dirs}}/crc32cer/priv/crc32cer*.so", "bin"}, - {copy, "{{lib_dirs}}/snappyer/priv/snappyer.so", "bin"} + {overlay, [{copy, "scripts/brod", "bin/"}, + {copy, "{{lib_dirs}}/crc32cer/priv/crc32cer*.so", "bin/"}, + {copy, "{{lib_dirs}}/snappyer/priv/snappyer.so", "bin/"} ]} ]}]}, {test, [ @@ -28,7 +28,7 @@ , {jsone, "1.7.0"} , {meck, "0.9.2"} , {proper, "1.4.0"} - , {snabbkaffe, "1.0.1"} + , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {branch, "1.0.8"}}} ]}, {erl_opts, [warnings_as_errors, {d, build_brod_cli}]} ]} diff --git a/scripts/docker-compose.yml b/scripts/docker-compose.yml index 34d28561..45839f99 100644 --- a/scripts/docker-compose.yml +++ b/scripts/docker-compose.yml @@ -2,14 +2,14 @@ version: "2" services: zookeeper: - image: "zmstone/kafka:${KAFKA_VERSION}" + image: "zmstone/kafka:${KAFKA_IMAGE_VERSION}" container_name: zookeeper command: run zookeeper network_mode: host kafka_1: depends_on: - zookeeper - image: "zmstone/kafka:${KAFKA_VERSION}" + image: "zmstone/kafka:${KAFKA_IMAGE_VERSION}" container_name: "kafka-1" network_mode: host environment: @@ -23,7 +23,7 @@ services: kafka_2: depends_on: - zookeeper - image: "zmstone/kafka:${KAFKA_VERSION}" + image: "zmstone/kafka:${KAFKA_IMAGE_VERSION}" container_name: "kafka-2" network_mode: host environment: diff --git a/scripts/setup-test-env.sh b/scripts/setup-test-env.sh index b877ef2f..25264fd1 100755 --- a/scripts/setup-test-env.sh +++ b/scripts/setup-test-env.sh @@ -1,5 +1,9 @@ #!/bin/bash -eu +if [ -n "${DEBUG:-}" ]; then + set -x +fi + docker ps > /dev/null || { echo "You must be a member of docker group to run this script" exit 1 @@ -18,46 +22,77 @@ function docker_compose { fi } -VERSION=${KAFKA_VERSION:-2.4} -if [ -z $VERSION ]; then VERSION=$1; fi +KAFKA_VERSION=${KAFKA_VERSION:-3.6} +if [ -z $KAFKA_VERSION ]; then KAFKA_VERSION=$1; fi -case $VERSION in +case $KAFKA_VERSION in + 0.9*) + KAFKA_VERSION="0.9";; 0.10*) - VERSION="0.10";; + KAFKA_VERSION="0.10";; 0.11*) - VERSION="0.11";; + KAFKA_VERSION="0.11";; 1.*) - VERSION="1.1";; + KAFKA_VERSION="1.1";; 2.*) - VERSION="2.4";; + KAFKA_VERSION="2.8";; + 3.*) + KAFKA_VERSION="3.6";; *) - VERSION="2.4";; + KAFKA_VERSION="3.6";; esac -echo "Using KAFKA_VERSION=$VERSION" -export KAFKA_VERSION=$VERSION +export KAFKA_IMAGE_VERSION="1.1-${KAFKA_VERSION}" +echo "env KAFKA_IMAGE_VERSION=$KAFKA_IMAGE_VERSION" TD="$(cd "$(dirname "$0")" && pwd)" docker_compose -f $TD/docker-compose.yml down || true docker_compose -f $TD/docker-compose.yml up -d +if [[ "$KAFKA_VERSION" == 2* ]] || [[ "$KAFKA_VERSION" == 3* ]]; then + MAYBE_ZOOKEEPER="--bootstrap-server localhost:9092" +else + MAYBE_ZOOKEEPER="--zookeeper localhost:2181" +fi -n=0 -while [ "$(docker exec kafka-1 bash -c '/opt/kafka/bin/kafka-topics.sh --zookeeper localhost --list')" != '' ]; do - if [ $n -gt 4 ]; then - echo "timeout waiting for kakfa_1" - exit 1 +TOPIC_LIST_CMD="/opt/kafka/bin/kafka-topics.sh $MAYBE_ZOOKEEPER --list" +MAX_WAIT_SEC=10 + +function wait_for_kafka { + local which_kafka="$1" + local n=0 + local port=':9092' + local topic_list listener + if [ "$which_kafka" = 'kafka-2' ]; then + port=':9192' fi - n=$(( n + 1 )) - sleep 1 -done + while true; do + listener="$(netstat -tnlp 2>&1 | grep $port || true)" + if [ "$listener" != '' ]; then + topic_list="$(docker exec $which_kafka $TOPIC_LIST_CMD 2>&1)" + if [ "${topic_list-}" = '' ]; then + break + fi + fi + if [ $n -gt $MAX_WAIT_SEC ]; then + echo "timeout waiting for kafka-1" + echo "last print: ${topic_list:-}" + exit 1 + fi + n=$(( n + 1 )) + sleep 1 + done +} + +wait_for_kafka kafka-1 +wait_for_kafka kafka-2 function create_topic { TOPIC_NAME="$1" PARTITIONS="${2:-1}" REPLICAS="${3:-1}" - CMD="/opt/kafka/bin/kafka-topics.sh --zookeeper localhost --create --partitions $PARTITIONS --replication-factor $REPLICAS --topic $TOPIC_NAME --config min.insync.replicas=1" + CMD="/opt/kafka/bin/kafka-topics.sh $MAYBE_ZOOKEEPER --create --partitions $PARTITIONS --replication-factor $REPLICAS --topic $TOPIC_NAME --config min.insync.replicas=1" docker exec kafka-1 bash -c "$CMD" } @@ -80,7 +115,7 @@ create_topic "brod_compression_SUITE" create_topic "lz4-test" create_topic "test-topic" -if [[ "$KAFKA_VERSION" = 2* ]]; then +if [[ "$KAFKA_VERSION" = 2* ]] || [[ "$KAFKA_VERSION" = 3* ]]; then MAYBE_NEW_CONSUMER="" else MAYBE_NEW_CONSUMER="--new-consumer" @@ -90,5 +125,5 @@ docker exec kafka-1 /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server l # for kafka 0.11 or later, add sasl-scram test credentials if [[ "$KAFKA_VERSION" != 0.9* ]] && [[ "$KAFKA_VERSION" != 0.10* ]]; then - docker exec kafka-1 /opt/kafka/bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=ecila],SCRAM-SHA-512=[password=ecila]' --entity-type users --entity-name alice + docker exec kafka-1 /opt/kafka/bin/kafka-configs.sh $MAYBE_ZOOKEEPER --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=ecila],SCRAM-SHA-512=[password=ecila]' --entity-type users --entity-name alice fi diff --git a/src/brod_consumer.erl b/src/brod_consumer.erl index 37cb1756..09f91c76 100644 --- a/src/brod_consumer.erl +++ b/src/brod_consumer.erl @@ -462,7 +462,7 @@ handle_fetch_response(#kpro_rsp{ref = Ref1}, } = State) when Ref1 =/= Ref2 -> %% Not expected response, discard {noreply, State}; -handle_fetch_response(#kpro_rsp{ref = Ref} = Rsp, +handle_fetch_response(#kpro_rsp{ref = Ref, vsn = Vsn} = Rsp, #state{ topic = Topic , partition = Partition , last_req_ref = Ref @@ -472,7 +472,7 @@ handle_fetch_response(#kpro_rsp{ref = Ref} = Rsp, {ok, #{ header := Header , batches := Batches }} -> - handle_batches(Header, Batches, State); + handle_batches(Header, Batches, State, Vsn); {error, ErrorCode} -> Error = #kafka_fetch_error{ topic = Topic , partition = Partition @@ -481,7 +481,7 @@ handle_fetch_response(#kpro_rsp{ref = Ref} = Rsp, handle_fetch_error(Error, State) end. -handle_batches(?undef, [], #state{} = State0) -> +handle_batches(?undef, [], #state{} = State0, _Vsn) -> %% It is only possible to end up here in a incremental %% fetch session, empty fetch response implies no %% new messages to fetch, and no changes in partition @@ -491,25 +491,38 @@ handle_batches(?undef, [], #state{} = State0) -> State = maybe_delay_fetch_request(State0), {noreply, State}; handle_batches(_Header, ?incomplete_batch(Size), - #state{max_bytes = MaxBytes} = State0) -> + #state{max_bytes = MaxBytes} = State0, _Vsn) -> %% max_bytes is too small to fetch ONE complete batch true = Size > MaxBytes, %% assert State1 = State0#state{max_bytes = Size}, State = maybe_send_fetch_request(State1), {noreply, State}; -handle_batches(Header, [], #state{begin_offset = BeginOffset} = State0) -> +handle_batches(Header, [], #state{begin_offset = BeginOffset} = State0, Vsn) -> StableOffset = brod_utils:get_stable_offset(Header), State = case BeginOffset < StableOffset of - true -> - %% There are chances that kafka may return empty message set + true when Vsn > 0 -> + %% There are chances that Kafka may return empty message set %% when messages are deleted from a compacted topic. %% Since there is no way to know how big the 'hole' is %% we can only bump begin_offset with +1 and try again. + ?BROD_LOG_WARNING("~s-~p empty_batch_detected_at_offset=~p, " + "fetch_api_vsn=~p, skip_to_offset=~p", + [State0#state.topic, + State0#state.partition, + BeginOffset, + BeginOffset + 1 + ]), State1 = State0#state{begin_offset = BeginOffset + 1}, maybe_send_fetch_request(State1); + true -> + %% Fetch API v0 (Kafka 0.9 and 0.10) seems to have a race condition: + %% Kafka returns empty batch even if BeginOffset is lower than high-watermark + %% if fetch request is sent in a tight loop + %% Retry seems to resolve the issue + maybe_delay_fetch_request(State0); false -> - %% we have either reached the end of a partition + %% We have either reached the end of a partition %% or trying to read uncommitted messages %% try to poll again (maybe after a delay) maybe_delay_fetch_request(State0) @@ -521,7 +534,7 @@ handle_batches(Header, Batches, , begin_offset = BeginOffset , topic = Topic , partition = Partition - } = State0) -> + } = State0, _Vsn) -> StableOffset = brod_utils:get_stable_offset(Header), {NewBeginOffset, Messages} = brod_utils:flatten_batches(BeginOffset, Header, Batches), diff --git a/src/brod_utils.erl b/src/brod_utils.erl index 61113519..f7732a1a 100644 --- a/src/brod_utils.erl +++ b/src/brod_utils.erl @@ -453,12 +453,25 @@ fetch(Conn, ReqFun, Offset, MaxBytes) -> fetch(Conn, ReqFun, Offset, Size); {ok, #{header := Header, batches := Batches}} -> StableOffset = get_stable_offset(Header), - {NewBeginOffset, Msgs} = flatten_batches(Offset, Header, Batches), + {NewBeginOffset0, Msgs} = flatten_batches(Offset, Header, Batches), case Offset < StableOffset andalso Msgs =:= [] of true -> - %% Not reached the latest stable offset yet, - %% but received an empty batch-set (all messages are dropped). - %% try again with new begin-offset + NewBeginOffset = + case NewBeginOffset0 > Offset of + true -> + %% Not reached the latest stable offset yet, + %% but resulted in an empty batch-set, + %% i.e. all messages are dropped due to they are before + %% the last fetch Offset. + %% try again with new begin-offset. + NewBeginOffset0; + false when NewBeginOffset0 =:= Offset -> + %% There are chances that Kafka may return empty message set + %% when messages are deleted from a compacted topic. + %% Since there is no way to know how big the 'hole' is + %% we can only bump begin_offset with +1 and try again. + NewBeginOffset0 + 1 + end, fetch(Conn, ReqFun, NewBeginOffset, MaxBytes); false -> {ok, {StableOffset, Msgs}} diff --git a/test/brod_SUITE.erl b/test/brod_SUITE.erl index 0ea476be..a7f0bb51 100644 --- a/test/brod_SUITE.erl +++ b/test/brod_SUITE.erl @@ -41,10 +41,11 @@ suite() -> [{timetrap, {minutes, 5}}]. init_per_suite(Config) -> - case os:getenv("KAFKA_VERSION") of - "0.9" -> {skip, - "The given Kafka test image does not have support for these apis"}; - _ -> Config + case kafka_test_helper:kafka_version() of + {0, 9} -> + {skip, "no_topic_manaegment_apis"}; + _ -> + Config end. end_per_suite(_Config) -> diff --git a/test/brod_cli_tests.erl b/test/brod_cli_tests.erl index 4d382fb2..014c9e1f 100644 --- a/test/brod_cli_tests.erl +++ b/test/brod_cli_tests.erl @@ -180,7 +180,13 @@ get_kafka_version() -> {list_to_integer(Major), list_to_integer(Minor)} end. -run(Args) -> +run(Args0) -> + Args = case kafka_test_helper:kafka_version() of + {0, Minor} when Minor < 11 -> + Args0 ++ ["--no-api-vsn-query"]; + _ -> + Args0 + end, _ = cmd(Args), ok. diff --git a/test/brod_compression_SUITE.erl b/test/brod_compression_SUITE.erl index ac8b9122..b51540b9 100644 --- a/test/brod_compression_SUITE.erl +++ b/test/brod_compression_SUITE.erl @@ -201,10 +201,7 @@ start_client(Hosts, ClientId) -> brod:start_client(Hosts, ClientId, Config). client_config() -> - case os:getenv("KAFKA_VERSION") of - "0.9" ++ _ -> [{query_api_versions, false}]; - _ -> [] - end. + kafka_test_helper:client_config(). %%%_* Emacs ==================================================================== %%% Local Variables: diff --git a/test/brod_consumer_SUITE.erl b/test/brod_consumer_SUITE.erl index 4c60a4a0..b5e6db8c 100644 --- a/test/brod_consumer_SUITE.erl +++ b/test/brod_consumer_SUITE.erl @@ -76,6 +76,17 @@ end end()). +-define(RETRY(EXPR, Timeout), + retry( + fun() -> + case EXPR of + {ok, R} -> + {ok, R}; + {error, _} -> + false + end + end, Timeout)). + %%%_* ct callbacks ============================================================= suite() -> [{timetrap, {seconds, 30}}]. @@ -104,11 +115,7 @@ init_per_testcase(Case, Config0) -> end, ClientConfig1 = proplists:get_value(client_config, Config, []), brod:stop_client(Client), - ClientConfig = - case os:getenv("KAFKA_VERSION") of - "0.9" ++ _ -> [{query_api_versions, false}]; - _ -> [] - end ++ ClientConfig0 ++ ClientConfig1, + ClientConfig = kafka_test_helper:client_config() ++ ClientConfig0 ++ ClientConfig1, ok = brod:start_client(?HOSTS, Client, ClientConfig), ok = brod:start_producer(Client, Topic, ProducerConfig), try ?MODULE:Case(standalone_consumer) of @@ -140,12 +147,21 @@ end_per_testcase(Case, Config) -> ok end. -all() -> [F || {F, _A} <- module_info(exports), +all() -> + Cases = [F || {F, _A} <- module_info(exports), case atom_to_list(F) of "t_" ++ _ -> true; _ -> false - end]. - + end], + Filter = fun(Case) -> + try + ?MODULE:Case(kafka_version_match) + catch + _:_ -> + true + end + end, + lists:filter(Filter, Cases). %%%_* Test functions =========================================================== @@ -153,14 +169,11 @@ all() -> [F || {F, _A} <- module_info(exports), %% messages fetched back should only contain the committed message %% i.e. aborted messages (testing with isolation_level=read_committed) %% should be dropped, control messages (transaction abort) should be dropped +t_drop_aborted(kafka_version_match) -> + has_txn(); t_drop_aborted(Config) when is_list(Config) -> - case has_txn() of - true -> - test_drop_aborted(Config, true), - test_drop_aborted(Config, false); - false -> - ok - end. + test_drop_aborted(Config, true), + test_drop_aborted(Config, false). %% When QueryApiVsn is set to false, %% brod will use lowest supported API version. @@ -173,7 +186,7 @@ test_drop_aborted(Config, QueryApiVsn) -> fun(CommitOrAbort) -> TxnId = make_transactional_id(), {ok, Conn} = connect_txn_coordinator(TxnId, ?config(client_config)), - {ok, TxnCtx} = kpro:txn_init_ctx(Conn, TxnId), + {ok, TxnCtx} = ?RETRY(kpro:txn_init_ctx(Conn, TxnId), 10), ok = kpro:txn_send_partitions(TxnCtx, [{Topic, Partition}]), Key = bin([atom_to_list(CommitOrAbort), "-", make_unique_key()]), Vsn = 3, %% lowest API version which supports transactional produce @@ -217,11 +230,10 @@ test_drop_aborted(Config, QueryApiVsn) -> ], Msgs) end. +t_wait_for_unstable_offsets(kafka_version_match) -> + has_txn(); t_wait_for_unstable_offsets(Config) when is_list(Config) -> - case has_txn() of - true -> t_wait_for_unstable_offsets({run, Config}); - false -> ok - end; + t_wait_for_unstable_offsets({run, Config}); t_wait_for_unstable_offsets({run, Config}) -> Client = ?config(client), Topic = ?TOPIC, @@ -230,7 +242,7 @@ t_wait_for_unstable_offsets({run, Config}) -> {ok, Conn} = connect_txn_coordinator(TxnId, ?config(client_config)), %% ensure we have enough time to test before expire TxnOpts = #{txn_timeout => timer:seconds(30)}, - {ok, TxnCtx} = kpro:txn_init_ctx(Conn, TxnId, TxnOpts), + {ok, TxnCtx} = ?RETRY(kpro:txn_init_ctx(Conn, TxnId, TxnOpts), 10), ok = kpro:txn_send_partitions(TxnCtx, [{Topic, Partition}]), %% Send one message in this transaction, return the offset in kafka ProduceFun = @@ -278,13 +290,10 @@ t_wait_for_unstable_offsets({run, Config}) -> %% Produce large(-ish) transactional batches, then abort them all %% try fetch from offsets in the middle of large batches, %% expect no delivery of any aborted batches. +t_fetch_aborted_from_the_middle(kafka_version_match) -> + has_txn(); t_fetch_aborted_from_the_middle(Config) when is_list(Config) -> - case has_txn() of - true -> - test_fetch_aborted_from_the_middle(Config); - false -> - ok - end. + test_fetch_aborted_from_the_middle(Config). test_fetch_aborted_from_the_middle(Config) when is_list(Config) -> Client = ?config(client), @@ -292,7 +301,7 @@ test_fetch_aborted_from_the_middle(Config) when is_list(Config) -> Partition = 0, TxnId = make_transactional_id(), {ok, Conn} = connect_txn_coordinator(TxnId, ?config(client_config)), - {ok, TxnCtx} = kpro:txn_init_ctx(Conn, TxnId), + {ok, TxnCtx} = ?RETRY(kpro:txn_init_ctx(Conn, TxnId), 10), ok = kpro:txn_send_partitions(TxnCtx, [{Topic, Partition}]), %% make a large-ish message MkMsg = fun(Key) -> @@ -413,6 +422,12 @@ t_fold(Config) when is_list(Config) -> 0, ErrorFoldF, #{})), ok. +%% This test case does not work with Kafka 0.9, not sure aobut 0.10 and 0.11 +%% since all 0.x versions are old enough, we only try to verify this against +%% 1.x or newer +t_direct_fetch_with_small_max_bytes(kafka_version_match) -> + {Major, _Minor} = kafka_test_helper:kafka_version(), + Major > 1; t_direct_fetch_with_small_max_bytes(Config) when is_list(Config) -> Client = ?config(client), Topic = ?TOPIC, @@ -428,6 +443,11 @@ t_direct_fetch_with_small_max_bytes(Config) when is_list(Config) -> ?assertEqual(Key, Msg#kafka_message.key), ok. +%% Starting from version 3, Kafka no longer returns incomplete batch +%% for Fetch request v0, cannot test max_bytes expansion anymore. +t_direct_fetch_expand_max_bytes(kafka_version_match) -> + {Major, _Minor} = kafka_test_helper:kafka_version(), + Major < 3; t_direct_fetch_expand_max_bytes({init, Config}) when is_list(Config) -> %% kafka returns empty message set when it's 0.9 %% or when fetch request sent was version 0 @@ -441,7 +461,7 @@ t_direct_fetch_expand_max_bytes(Config) when is_list(Config) -> Value = crypto:strong_rand_bytes(100), ok = brod:produce_sync(Client, ?TOPIC, Partition, Key, Value), {ok, Offset} = brod:resolve_offset(?HOSTS, Topic, Partition, - ?OFFSET_LATEST, ?config(client_config)), + ?OFFSET_LATEST, ?config(client_config)), {ok, {_, [Msg]}} = brod:fetch({?HOSTS, ?config(client_config)}, Topic, Partition, Offset - 1, #{max_bytes => 13}), @@ -450,6 +470,9 @@ t_direct_fetch_expand_max_bytes(Config) when is_list(Config) -> %% @doc Consumer should be smart enough to try greater max_bytes %% when it's not great enough to fetch one single message +t_consumer_max_bytes_too_small(kafka_version_match) -> + {Major, _Minor} = kafka_test_helper:kafka_version(), + Major < 3; t_consumer_max_bytes_too_small({init, Config}) -> meck:new(brod_kafka_request, [passthrough, no_passthrough_cover, no_history]), %% kafka returns empty message set when it's 0.9 @@ -843,7 +866,9 @@ wait_for_max_bytes_sequence([{Compare, MaxBytes} | Rest] = Waiting, Cnt) -> wait_for_max_bytes_sequence(Waiting, Cnt + 1); _ -> ct:fail("unexpected ~p, expecting ~p", [Bytes, {Compare, MaxBytes}]) - end + end; + Other -> + error(Other) after 3000 -> ct:fail("timeout", []) @@ -875,13 +900,6 @@ connect_txn_coordinator(TxnId, Config, RetriesLeft, _LastError) -> connect_txn_coordinator(TxnId, Config, RetriesLeft - 1, Reason) end. -has_txn() -> - case os:getenv("KAFKA_VERSION") of - "0.9" ++ _ -> false; - "0.10" ++ _ -> false; - _ -> true - end. - consumer_config() -> [{max_wait_time, 1000}, {sleep_timeout, 10}]. retry(_F, 0) -> error(timeout); @@ -901,6 +919,14 @@ wait_for_consumer_connection(Consumer, OldConn) -> end, retry(F, 5). +has_txn() -> + case kafka_test_helper:kafka_version() of + {0, Minor} when Minor < 11 -> + false; + _ -> + true + end. + %%%_* Emacs ==================================================================== %%% Local Variables: %%% allout-layout: t diff --git a/test/brod_demo_group_subscriber_koc.erl b/test/brod_demo_group_subscriber_koc.erl index 49407ea6..5680c780 100644 --- a/test/brod_demo_group_subscriber_koc.erl +++ b/test/brod_demo_group_subscriber_koc.erl @@ -218,10 +218,7 @@ os_time_utc_str() -> lists:flatten(S). client_config() -> - case os:getenv("KAFKA_VERSION") of - "0.9" ++ _ -> [{query_api_versions, false}]; - _ -> [] - end. + kafka_test_helper:client_config(). %%%_* Emacs ==================================================================== %%% Local Variables: diff --git a/test/brod_demo_group_subscriber_loc.erl b/test/brod_demo_group_subscriber_loc.erl index 39f5d543..b9f7539e 100644 --- a/test/brod_demo_group_subscriber_loc.erl +++ b/test/brod_demo_group_subscriber_loc.erl @@ -235,10 +235,7 @@ os_time_utc_str() -> lists:flatten(S). client_config() -> - case os:getenv("KAFKA_VERSION") of - "0.9" ++ _ -> [{query_api_versions, false}]; - _ -> [] - end. + kafka_test_helper:client_config(). %%%_* Emacs ==================================================================== %%% Local Variables: diff --git a/test/brod_demo_topic_subscriber.erl b/test/brod_demo_topic_subscriber.erl index 77de8500..e49d5b73 100644 --- a/test/brod_demo_topic_subscriber.erl +++ b/test/brod_demo_topic_subscriber.erl @@ -177,10 +177,7 @@ os_time_utc_str() -> lists:flatten(S). client_config() -> - case os:getenv("KAFKA_VERSION") of - "0.9" ++ _ -> [{query_api_versions, false}]; - _ -> [] - end. + kafka_test_helper:client_config(). commit_dir(Topic) -> filename:join(["/tmp", Topic]). diff --git a/test/brod_group_coordinator_SUITE.erl b/test/brod_group_coordinator_SUITE.erl index ace01079..01a3e47f 100644 --- a/test/brod_group_coordinator_SUITE.erl +++ b/test/brod_group_coordinator_SUITE.erl @@ -73,10 +73,7 @@ common_end_per_testcase(_Case, Config) when is_list(Config) -> ok = application:stop(brod). client_config() -> - case os:getenv("KAFKA_VERSION") of - "0.9" ++ _ -> [{query_api_versions, false}]; - _ -> [] - end. + kafka_test_helper:client_config(). %%%_* Group coordinator callbacks ============================================== diff --git a/test/brod_offset_txn_SUITE.erl b/test/brod_offset_txn_SUITE.erl index 5565a8a4..aee57168 100644 --- a/test/brod_offset_txn_SUITE.erl +++ b/test/brod_offset_txn_SUITE.erl @@ -36,8 +36,13 @@ suite() -> [{timetrap, {seconds, 30}}]. init_per_suite(Config) -> - {ok, _} = application:ensure_all_started(brod), - Config. + case kafka_test_helper:kafka_version() of + {0, Minor} when Minor < 11 -> + {skip, "no_transaction"}; + _ -> + {ok, _} = application:ensure_all_started(brod), + Config + end. end_per_suite(_Config) -> ok. @@ -72,17 +77,14 @@ end_per_testcase(_Case, Config) -> end, Config. -all() -> [F || {F, _A} <- module_info(exports), +all() -> + [F || {F, _A} <- module_info(exports), case atom_to_list(F) of "t_" ++ _ -> true; _ -> false end]. -client_config() -> - case os:getenv("KAFKA_VERSION") of - "0.9" ++ _ -> [{query_api_versions, false}]; - _ -> [] - end. +client_config() -> kafka_test_helper:client_config(). init(GroupId, #{ client := Client diff --git a/test/brod_producer_SUITE.erl b/test/brod_producer_SUITE.erl index 558340f4..40004da3 100644 --- a/test/brod_producer_SUITE.erl +++ b/test/brod_producer_SUITE.erl @@ -452,10 +452,7 @@ t_configure_produce_api_vsn(Config) when is_list(Config) -> %%%_* Help functions =========================================================== client_config() -> - case os:getenv("KAFKA_VERSION") of - "0.9" ++ _ -> [{query_api_versions, false}]; - _ -> [] - end. + kafka_test_helper:client_config(). %% os:timestamp should be unique enough for testing make_unique_kv() -> diff --git a/test/brod_topic_subscriber_SUITE.erl b/test/brod_topic_subscriber_SUITE.erl index 49f72e91..362518f0 100644 --- a/test/brod_topic_subscriber_SUITE.erl +++ b/test/brod_topic_subscriber_SUITE.erl @@ -113,7 +113,7 @@ handle_info({ack_offset, Partition, Offset} = Msg, #state{ counter = Counter , worker_id = Ref } = State0) -> %% Participate in state continuity checks - ?tp(topic_subscriber_seen_info, + ?tp(topic_subscriber_seen_info, #{ partition => Partition , offset => Offset , msg => Msg diff --git a/test/brod_txn_SUITE.erl b/test/brod_txn_SUITE.erl index b5fb9294..8892695c 100644 --- a/test/brod_txn_SUITE.erl +++ b/test/brod_txn_SUITE.erl @@ -29,8 +29,13 @@ suite() -> [{timetrap, {seconds, 30}}]. init_per_suite(Config) -> - {ok, _} = application:ensure_all_started(brod), - Config. + case kafka_test_helper:kafka_version() of + {0, Minor} when Minor < 11 -> + {skip, "no_transaction"}; + _ -> + {ok, _} = application:ensure_all_started(brod), + Config + end. end_per_suite(_Config) -> ok. @@ -80,10 +85,7 @@ all() -> [F || {F, _A} <- module_info(exports), end]. client_config() -> - case os:getenv("KAFKA_VERSION") of - "0.9" ++ _ -> [{query_api_versions, false}]; - _ -> [] - end. + []. subscriber_loop(TesterPid) -> receive diff --git a/test/brod_txn_processor_SUITE.erl b/test/brod_txn_processor_SUITE.erl index cc2d5e3d..29ee0e3c 100644 --- a/test/brod_txn_processor_SUITE.erl +++ b/test/brod_txn_processor_SUITE.erl @@ -35,8 +35,13 @@ suite() -> [{timetrap, {seconds, 30}}]. init_per_suite(Config) -> - {ok, _} = application:ensure_all_started(brod), - Config. + case kafka_test_helper:kafka_version() of + {0, Minor} when Minor < 11 -> + {skip, "no_transaction"}; + _ -> + {ok, _} = application:ensure_all_started(brod), + Config + end. end_per_suite(_Config) -> ok. @@ -65,11 +70,7 @@ all() -> [F || {F, _A} <- module_info(exports), _ -> false end]. -client_config() -> - case os:getenv("KAFKA_VERSION") of - "0.9" ++ _ -> [{query_api_versions, false}]; - _ -> [] - end. +client_config() -> kafka_test_helper:client_config(). rand() -> iolist_to_binary([base64:encode(crypto:strong_rand_bytes(8))]). diff --git a/test/data/ssl/README.md b/test/data/ssl/README.md new file mode 100644 index 00000000..70880474 --- /dev/null +++ b/test/data/ssl/README.md @@ -0,0 +1,3 @@ +This dir holds files for TLS/SSL tests. +The files are copied from Kafka docker image in the `make test-env` step. +See how the docker image is built here: https://github.com/zmstone/docker-kafka diff --git a/test/data/ssl/ca.pem b/test/data/ssl/ca.pem deleted file mode 100644 index 614aa1ef..00000000 --- a/test/data/ssl/ca.pem +++ /dev/null @@ -1,22 +0,0 @@ ------BEGIN CERTIFICATE----- -MIIDkzCCAnugAwIBAgIJAPjeRT8z4mElMA0GCSqGSIb3DQEBCwUAMGAxCzAJBgNV -BAYTAlNFMRIwEAYDVQQIDAlTdG9ja2hvbG0xEjAQBgNVBAcMCVN0b2NraG9sbTEN -MAsGA1UECgwEYnJvZDENMAsGA1UECwwEdGVzdDELMAkGA1UEAwwCKi4wHhcNMTYx -MTA0MTYxNDM2WhcNMjYxMTAyMTYxNDM2WjBgMQswCQYDVQQGEwJTRTESMBAGA1UE -CAwJU3RvY2tob2xtMRIwEAYDVQQHDAlTdG9ja2hvbG0xDTALBgNVBAoMBGJyb2Qx -DTALBgNVBAsMBHRlc3QxCzAJBgNVBAMMAiouMIIBIjANBgkqhkiG9w0BAQEFAAOC -AQ8AMIIBCgKCAQEAyIbBpX2DvhIbcXx1uho3Vm+hOLXrZJwNgVL3yDx/anGPvD2a -ZkUjdrJNh8jy5ZFA7jBQGLYIyMQYY8UMyAPIQbCsi0wvFhcWfv+/VTSOfgcK04D+ -QQRni8lkWI66oBcM02Wtwo3K5W7KWJ+LOAaV5hmSvLhcyIsSQC6MRBGRGJ89Oyza -7s1FrCY0HCa6BicY48sLTHTT8MScK5kOMO5KqMK8rY/dLRYynhC2K8/stzqN27HI -MoktDEzzCAfRaNfXE8o1NekJcpFLQNi9/nab7vcbWo/QmUCCF0Ny5BGWEx+GpEp9 -HjVM5KYAYlDqpMm3wttMs7dtU9lEXZk69uCejwIDAQABo1AwTjAdBgNVHQ4EFgQU -I1wMy5ObzZNi7qh3W9VSYKJRctYwHwYDVR0jBBgwFoAUI1wMy5ObzZNi7qh3W9VS -YKJRctYwDAYDVR0TBAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEAGmyomSHF4sZh -yNIdDmeUp+nOFE6dXuK1vo8PlKrf+tyajNdic5ZMCF9BzbKSxFnejwHA2fzBlu6P -27kmmMPWuAhvcyxNciLZ/gGCn5gMPutKaweuaD6G93jkIngdXtbz+c6icpwsO9cK -Z0mdVuesJnmLQYLn9pHDzGUGYPFZpHVXwQzyAVw4m9T+aqKwwe/0dL1Z/8b/iuwN -K0S4/c7gLH8rB1jQisHomgHano43TzJq8ZFX7wF1E2tnHDdGk+uEZr5C7VPRgrF8 -/DhGGJnw3AoQgD5g1YqFGA5pA0AXr4RF27Y7bKYnzvbktOkfcNhw/4P2rKXWWs1Q -x2xsU3VaTQ== ------END CERTIFICATE----- diff --git a/test/data/ssl/client-crt.pem b/test/data/ssl/client-crt.pem deleted file mode 100644 index 45df2707..00000000 --- a/test/data/ssl/client-crt.pem +++ /dev/null @@ -1,20 +0,0 @@ ------BEGIN CERTIFICATE----- -MIIDPDCCAiQCCQDU7hj/emVKfDANBgkqhkiG9w0BAQsFADBgMQswCQYDVQQGEwJT -RTESMBAGA1UECAwJU3RvY2tob2xtMRIwEAYDVQQHDAlTdG9ja2hvbG0xDTALBgNV -BAoMBGJyb2QxDTALBgNVBAsMBHRlc3QxCzAJBgNVBAMMAiouMB4XDTE2MTEwNDE2 -MTQzNloXDTI2MTEwMjE2MTQzNlowYDELMAkGA1UEBhMCU0UxEjAQBgNVBAgMCVN0 -b2NraG9sbTESMBAGA1UEBwwJU3RvY2tob2xtMQ0wCwYDVQQKDARicm9kMQ0wCwYD -VQQLDAR0ZXN0MQswCQYDVQQDDAIqLjCCASIwDQYJKoZIhvcNAQEBBQADggEPADCC -AQoCggEBAKtN+UzF8g9JRhKXn/rnm4jCr26+HpboDQYxG1HSCwwWskdOMK1b/8w4 -ipNzpoV16teRW5AVdq5Z6DDzBE5X43rrJZ9+x6pd25mVyktmwAIxEYscLtxN1UoL -a5EF13D8UPWCyzylThhUwi67bHvbLeWzAKoccKqdV/5ZNjFnqt9Q9seFOxyXNcFE -/qfUQTfkcL4rei2dgkFPFOXbF2rKRgMaiseyVAJP0G8AcsCkQvaYnkQrJ8nAZBtI -vZmq2og9PW7Z8rEbm9TVLnLNtEE5Lx2S1SQS9QPccYJDAyQJLCOw2ikGQPgtDfbs -KILEp+MChTWgEeb/LBlN/qa+zDraDm0CAwEAATANBgkqhkiG9w0BAQsFAAOCAQEA -EdsizFjP+hWSa5A0UFRIETvAztpTd+pjWWVv3DwCCRc2aMys+GYnR5fkHtnwKr7u -diZ8SSMZQFhlxA9MRNe8++wKeKeCzqrwIV1+mQcGqrJLl6sxW5TcMs/bRy5BPwZJ -RGlcz6HdLY8UBZzY2Qy2A4VecqwNe07Vg+7Yebui4w09pt5045S/q33/arb/LKP+ -1CbCjNyF3QC0aww+YgML+PgjnNtqbO/85qV424/dMX+aNAotQ/zBdEfEXyFaCoAE -yCHA99FnhHsQ9gwv9vhMLAX+yiBIEoh3e18EtmZdsvsTpDd1KI4nrh44TJfEY65+ -fNeAXYygkzsN14bbk9PgMw== ------END CERTIFICATE----- diff --git a/test/data/ssl/client-key.pem b/test/data/ssl/client-key.pem deleted file mode 100644 index 641967ab..00000000 --- a/test/data/ssl/client-key.pem +++ /dev/null @@ -1,28 +0,0 @@ ------BEGIN PRIVATE KEY----- -MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCrTflMxfIPSUYS -l5/655uIwq9uvh6W6A0GMRtR0gsMFrJHTjCtW//MOIqTc6aFderXkVuQFXauWegw -8wROV+N66yWffseqXduZlcpLZsACMRGLHC7cTdVKC2uRBddw/FD1gss8pU4YVMIu -u2x72y3lswCqHHCqnVf+WTYxZ6rfUPbHhTsclzXBRP6n1EE35HC+K3otnYJBTxTl -2xdqykYDGorHslQCT9BvAHLApEL2mJ5EKyfJwGQbSL2ZqtqIPT1u2fKxG5vU1S5y -zbRBOS8dktUkEvUD3HGCQwMkCSwjsNopBkD4LQ327CiCxKfjAoU1oBHm/ywZTf6m -vsw62g5tAgMBAAECggEADFm50Jww4INC5xJBeYB7STfoGA7i+7RNRBYERzjijQOR -5OwxPD52yc2FyC29Yr/mp5YWSOQTQ2y9/dF3jQJvJyyO8NneIV1U+NTA2gDVdRL+ -lc35Xu7JouYB4lnOd5npaFn+tyef4scxnNbscl2SCI6ITLtyMAraDj92VceInUMF -28srCTMdjbhVLpeq80qdeDVnyzlmua1W8pjR1lNXY2IECS9gTp6+JLiMQ0FJlC9V -r+U5iAoqLCNh+QpdM+2Z8kbkKA5PqsWcAhx+dTTkbRPp59r7Qd2xtxde5PGlm6zp -cqXgbWaXCMlbL5C7eOyPfLty3+KPrR6LGW6jGEqioQKBgQDcK2LGx/1PE2Y27p+O -RImN5SYERiRnYen7bm1CBAoH1J5LDxWWcf8Bz8/y4bNvEZJVosvPDRoNilI4RTYD -JiJw/qXio6FG78yIzvCK0WLIPgq6stufdbd/+UsNrDbGTuhk/qti8TSckEEgrUWg -U0NgEc/zyIMQK/4mZSgqeUpuxQKBgQDHLsxRT3Ile4sT2arxv0/KzSmnEL9hCAa9 -Cf+N0mWPrt6rzJUTD0+FBToXGP3k4auKETRb3XHvSHCwwl6Bi+NTPpVYqBT53jEv -fSb6bMjSlZyja+miVh/7TI2on7keus19XtZyks7PKoHa+i4w61zy5jbBdBC/kU1y -8O3HXF4biQKBgQCI6/5o6vTQmZrmrK3TtzHoacqju89l79GoyPrvpD1ss0CiI0Zk -oo5ZXRjQzqZde4sK8MxY/qfmJdCOKBS4Dp46sVMOyH5C9Fy59CBJ5H/PUi4v/41v -9LBiyPFxFlmWKHqEXJDPXnw+pcOrA7caRs3O0CUIUfmYNBPBYwWArJ+qlQKBgFpO -25BaJvTbqNkdLaZiCTl3/9ShgUPrMbLwH5AbvrSAorDeFxEHNhSnpAjo6eSmdPIq -jsTACHJnM8DQv6yY0j7h9zC1NJ19omtXoR6VyA/CibyGpu1VgzabJPc5Q+Os6pJX -N3/HFEFVkn7IQ70mWYQ/4L+hch6JMMZWeliTho+RAoGADcqzTMLtp7kRH8LQcq1n -oCE2FYJPvpd8PWlMCZ0ewSk6CbIgLvwJ+Hw0040m11HlCG7xVNdJV0rAU68Z7txm -pYIXL3D9MlJWCWMjZ7k11fuN1EtPLYYhMgS7FhADdUfFhnRGDkF2LnbvZIh3UtN6 -H5khVwyCU9LwQoxKfTmuxnY= ------END PRIVATE KEY----- diff --git a/test/kafka_test_helper.erl b/test/kafka_test_helper.erl index 56ac9b4f..fd6f0ce3 100644 --- a/test/kafka_test_helper.erl +++ b/test/kafka_test_helper.erl @@ -17,6 +17,7 @@ , client_config/0 , bootstrap_hosts/0 , kill_process/2 + , kafka_version/0 ]). -include("brod_test_macros.hrl"). @@ -82,9 +83,30 @@ produce_payloads(Topic, Partition, Config) -> {LastOffset, Payloads}. client_config() -> - case os:getenv("KAFKA_VERSION") of - "0.9" ++ _ -> [{query_api_versions, false}]; - _ -> [] + case kafka_version() of + {0, 9} -> [{query_api_versions, false}]; + _ -> [] + end. + +maybe_zookeeper() -> + case kafka_version() of + {3, _} -> + %% Kafka 2.2 started supporting --bootstap-server, but 2.x still supports --zookeeper + %% Starting from 3.0, --zookeeper is no longer supported, must use --bootstrap-server + "--bootstrap-server localhost:9092"; + _ -> + "--zookeeper localhost:2181" + end. + +kafka_version() -> + VsnStr = os:getenv("KAFKA_VERSION"), + case VsnStr =:= "" orelse VsnStr =:= false of + true -> + ct:pal("KAFKA_VERSION is not set, defaulting to 3.6", []), + {3, 6}; + false -> + [Major, Minor | _] = string:tokens(VsnStr, "."), + {list_to_integer(Major), list_to_integer(Minor)} end. prepare_topic(Topic) when is_binary(Topic) -> @@ -97,12 +119,12 @@ prepare_topic({Topic, NumPartitions, NumReplicas}) -> ok = brod:start_producer(?TEST_CLIENT_ID, Topic, _ProducerConfig = []). delete_topic(Name) -> - Delete = "/opt/kafka/bin/kafka-topics.sh --zookeeper localhost " + Delete = "/opt/kafka/bin/kafka-topics.sh " ++ maybe_zookeeper() ++ " --delete --topic ~s", exec_in_kafka_container(Delete, [Name]). create_topic(Name, NumPartitions, NumReplicas) -> - Create = "/opt/kafka/bin/kafka-topics.sh --zookeeper localhost " + Create = "/opt/kafka/bin/kafka-topics.sh " ++ maybe_zookeeper() ++ " --create --partitions ~p --replication-factor ~p" " --topic ~s --config min.insync.replicas=1", exec_in_kafka_container(Create, [NumPartitions, NumReplicas, Name]). From 28a3a2e8ff5d17bdd4f4c432e38d713803845671 Mon Sep 17 00:00:00 2001 From: zmstone Date: Mon, 17 Jun 2024 22:53:30 +0200 Subject: [PATCH 7/8] test: make sure max_bytes is at least 12 bytes Mostly due to Kafka 0.9 does not cope well when max_bytes is 0. Also, it does not make sense to fetch less than 12 bytes because it needs at least 12 bytes to figure out the remaining bytes of a message set. --- src/brod_consumer.erl | 4 +++- test/brod_consumer_SUITE.erl | 8 +++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/brod_consumer.erl b/src/brod_consumer.erl index 09f91c76..a4c39d5c 100644 --- a/src/brod_consumer.erl +++ b/src/brod_consumer.erl @@ -734,6 +734,8 @@ send_fetch_request(#state{ begin_offset = BeginOffset } = State) -> (is_integer(BeginOffset) andalso BeginOffset >= 0) orelse erlang:error({bad_begin_offset, BeginOffset}), + %% MaxBytes=0 will make no progress when it's Kafka 0.9 + MaxBytes = max(12, State#state.max_bytes), Request = brod_kafka_request:fetch(Connection, State#state.topic, @@ -741,7 +743,7 @@ send_fetch_request(#state{ begin_offset = BeginOffset State#state.begin_offset, State#state.max_wait_time, State#state.min_bytes, - State#state.max_bytes, + MaxBytes, State#state.isolation_level), case kpro:request_async(Connection, Request) of ok -> diff --git a/test/brod_consumer_SUITE.erl b/test/brod_consumer_SUITE.erl index b5e6db8c..93774d1a 100644 --- a/test/brod_consumer_SUITE.erl +++ b/test/brod_consumer_SUITE.erl @@ -487,9 +487,8 @@ t_consumer_max_bytes_too_small(Config) -> brod:unsubscribe(Client, ?TOPIC, Partition), Key = make_unique_key(), ValueBytes = 2000, - MaxBytes1 = 8, %% too small for even the header - MaxBytes2 = 12, %% too small but message size is fetched - MaxBytes3 = size(Key) + ValueBytes, + MaxBytes1 = 12, %% too small for even the header + MaxBytes2 = size(Key) + ValueBytes, Tester = self(), F = fun(Conn, Topic, Partition1, BeginOffset, MaxWait, MinBytes, MaxBytes, IsolationLevel) -> @@ -505,8 +504,7 @@ t_consumer_max_bytes_too_small(Config) -> brod:subscribe(Client, self(), ?TOPIC, Partition, Options), ok = brod:produce_sync(Client, ?TOPIC, Partition, Key, Value), ok = wait_for_max_bytes_sequence([{'=', MaxBytes1}, - {'=', MaxBytes2}, - {'>', MaxBytes3}], + {'>', MaxBytes2}], _TriedCount = 0), ?WAIT_ONLY( {ConsumerPid, #kafka_message_set{messages = Messages}}, From d5e92b4bed13606349ab6098ad76e6b877e201f8 Mon Sep 17 00:00:00 2001 From: zmstone Date: Mon, 17 Jun 2024 22:56:03 +0200 Subject: [PATCH 8/8] test: stable brod_group_subscriber_SUITE:t_async_commit --- test/brod_group_subscriber_SUITE.erl | 3 ++- test/brod_topic_subscriber_SUITE.erl | 8 ++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/test/brod_group_subscriber_SUITE.erl b/test/brod_group_subscriber_SUITE.erl index d55dea72..7ad69d14 100644 --- a/test/brod_group_subscriber_SUITE.erl +++ b/test/brod_group_subscriber_SUITE.erl @@ -550,6 +550,7 @@ t_async_commit({init, Config}) -> integer_to_list(rand:uniform(1000000000))), [{group_id, GroupId} | Config]; t_async_commit(Config) when is_list(Config) -> + GroupId = ?config(group_id), Behavior = ?config(behavior), Topic = ?topic, Partition = 0, @@ -562,7 +563,7 @@ t_async_commit(Config) when is_list(Config) -> , {partition_restart_delay_seconds, 1} , {begin_offset, Offset} ], - {ok, SubscriberPid} = start_subscriber(?group_id, Config, [Topic], + {ok, SubscriberPid} = start_subscriber(GroupId, Config, [Topic], GroupConfig, ConsumerConfig, InitArgs), SubscriberPid diff --git a/test/brod_topic_subscriber_SUITE.erl b/test/brod_topic_subscriber_SUITE.erl index 362518f0..db61189d 100644 --- a/test/brod_topic_subscriber_SUITE.erl +++ b/test/brod_topic_subscriber_SUITE.erl @@ -211,7 +211,7 @@ t_consumer_ack_via_message_passing(Config) when is_list(Config) -> Partition = 0, SendFun = fun(I) -> - produce({?topic, Partition}, <>) + produce({?topic, Partition}, integer_to_binary(I)) end, ?check_trace( %% Run stage: @@ -225,12 +225,12 @@ t_consumer_ack_via_message_passing(Config) when is_list(Config) -> {ok, SubscriberPid} = brod:start_link_topic_subscriber(?CLIENT_ID, ?topic, ConsumerConfig, ?MODULE, InitArgs), - {ok, _} = wait_message(<<1>>), + {ok, _} = wait_message(<<"1">>), %% ack_offset allows consumer to proceed to message 2 SubscriberPid ! {ack_offset, 0, Offset0}, - {ok, _} = wait_message(<<2>>), + {ok, _} = wait_message(<<"2">>), ok = brod_topic_subscriber:stop(SubscriberPid), - _Expected = [<<1>>, <<2>>] + _Expected = [<<"1">>, <<"2">>] end, %% Check stage: fun(Expected, Trace) ->