diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 1ab8ffd0..209da873 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -7,8 +7,8 @@ on: branches: - master env: - OTP_VERSION: "24.1" - REBAR_VERSION: "3.17.0" + OTP_VERSION: "26" + REBAR_VERSION: "3.20.0" jobs: lint: @@ -44,8 +44,8 @@ jobs: strategy: fail-fast: false matrix: - otp: ["24.1", "23.3.4.7", "22.3.4.21"] - kafka: ["2.4", "1.1", "0.11"] + otp: ["26"] + kafka: ["0.9", "0.10", "0.11", "2.8", "1.1", "3.6"] steps: - name: Checkout uses: actions/checkout@v2 @@ -69,7 +69,8 @@ jobs: run: | export KAFKA_VERSION=${{ matrix.kafka }} echo "Running Kafka ${KAFKA_VERSION}" - scripts/setup-test-env.sh && rebar3 do ct,eunit + make test-env + make t - name: Store test logs uses: actions/upload-artifact@v1 if: always() diff --git a/.gitignore b/.gitignore index 516ec934..0e07c25f 100644 --- a/.gitignore +++ b/.gitignore @@ -23,4 +23,6 @@ _rel/ *.log relx docker/ -TAGS \ No newline at end of file +TAGS +.vscode/ +test/data/ssl/*.pem diff --git a/CHANGELOG.md b/CHANGELOG.md index 6707586a..c73416cf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,21 @@ # Changelog +- 3.19.1 + - Made brod-cli to work on OTP 26. [PR#582](https://github.com/kafka4beam/brod/pull/582) + - `--ssl` option is now mandatory if TLS is to be used (previously it can be derived from `--cacertfile` option) + - TLS version defaults to 1.2, added `--ssl-versions` to support explictly setting TLS 1.3 + +- 3.19.0 + - Forward unhandled messages in topic/group consumer processes to handle_info/2 callbacks + in order to support arbitrary message passing [PR#580](https://github.com/kafka4beam/brod/pull/580) + +- 3.18.0 + - Add transactional APIs. [PR#549](https://github.com/kafka4beam/brod/pull/549) + - Fix unnecessary group coordinator restart due to `hb_timeout` exception. [PR#578](https://github.com/kafka4beam/brod/pull/578) + - Changed supervisor3 progress log level from `info` to `debug`. [PR#572](https://github.com/kafka4beam/brod/pull/572) + - Type spec fix. [PR#571](https://github.com/kafka4beam/brod/pull/571) + - Remove unused macro. [PR#575](https://github.com/kafka4beam/brod/pull/575) + - 3.17.1 - Upgrade `kafka_protocol` from 4.1.3 to 4.1.5 - Allow space after `,` in comma-separated bootstrapping host:port list diff --git a/Makefile b/Makefile index 426e39a2..7b7d6db9 100644 --- a/Makefile +++ b/Makefile @@ -1,3 +1,5 @@ +KAFKA_VERSION ?= 3.6 +export KAFKA_VERSION all: compile compile: @@ -8,6 +10,10 @@ lint: test-env: @./scripts/setup-test-env.sh + @mkdir -p ./test/data/ssl + @docker cp kafka-1:/localhost-ca-crt.pem ./test/data/ssl/ca.pem + @docker cp kafka-1:/localhost-client-key.pem ./test/data/ssl/client-key.pem + @docker cp kafka-1:/localhost-client-crt.pem ./test/data/ssl/client-crt.pem ut: @rebar3 eunit -v --cover_export_name ut-$(KAFKA_VERSION) diff --git a/guides/examples/Authentication.md b/guides/examples/Authentication.md index e593514b..a3065655 100644 --- a/guides/examples/Authentication.md +++ b/guides/examples/Authentication.md @@ -60,6 +60,7 @@ For more info see the Erlang Ecosystem Foundation's [server certificate verifica , { depth, 3 } , { customize_hostname_check, [{match_fun, public_key:pkix_verify_hostname_match_fun(https)}]} + , {version, ['tlsv1.3', 'tlsv1.2']} ]} , { sasl, {plain, "GFRW5BSQHKEH0TSG", "GrL3CNTkLhsvtBr8srGn0VilMpgDb4lPD"}} ] diff --git a/priv/ssl/ca.crt b/priv/ssl/ca.crt deleted file mode 100644 index 614aa1ef..00000000 --- a/priv/ssl/ca.crt +++ /dev/null @@ -1,22 +0,0 @@ ------BEGIN CERTIFICATE----- -MIIDkzCCAnugAwIBAgIJAPjeRT8z4mElMA0GCSqGSIb3DQEBCwUAMGAxCzAJBgNV -BAYTAlNFMRIwEAYDVQQIDAlTdG9ja2hvbG0xEjAQBgNVBAcMCVN0b2NraG9sbTEN -MAsGA1UECgwEYnJvZDENMAsGA1UECwwEdGVzdDELMAkGA1UEAwwCKi4wHhcNMTYx -MTA0MTYxNDM2WhcNMjYxMTAyMTYxNDM2WjBgMQswCQYDVQQGEwJTRTESMBAGA1UE -CAwJU3RvY2tob2xtMRIwEAYDVQQHDAlTdG9ja2hvbG0xDTALBgNVBAoMBGJyb2Qx -DTALBgNVBAsMBHRlc3QxCzAJBgNVBAMMAiouMIIBIjANBgkqhkiG9w0BAQEFAAOC -AQ8AMIIBCgKCAQEAyIbBpX2DvhIbcXx1uho3Vm+hOLXrZJwNgVL3yDx/anGPvD2a -ZkUjdrJNh8jy5ZFA7jBQGLYIyMQYY8UMyAPIQbCsi0wvFhcWfv+/VTSOfgcK04D+ -QQRni8lkWI66oBcM02Wtwo3K5W7KWJ+LOAaV5hmSvLhcyIsSQC6MRBGRGJ89Oyza -7s1FrCY0HCa6BicY48sLTHTT8MScK5kOMO5KqMK8rY/dLRYynhC2K8/stzqN27HI -MoktDEzzCAfRaNfXE8o1NekJcpFLQNi9/nab7vcbWo/QmUCCF0Ny5BGWEx+GpEp9 -HjVM5KYAYlDqpMm3wttMs7dtU9lEXZk69uCejwIDAQABo1AwTjAdBgNVHQ4EFgQU -I1wMy5ObzZNi7qh3W9VSYKJRctYwHwYDVR0jBBgwFoAUI1wMy5ObzZNi7qh3W9VS -YKJRctYwDAYDVR0TBAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEAGmyomSHF4sZh -yNIdDmeUp+nOFE6dXuK1vo8PlKrf+tyajNdic5ZMCF9BzbKSxFnejwHA2fzBlu6P -27kmmMPWuAhvcyxNciLZ/gGCn5gMPutKaweuaD6G93jkIngdXtbz+c6icpwsO9cK -Z0mdVuesJnmLQYLn9pHDzGUGYPFZpHVXwQzyAVw4m9T+aqKwwe/0dL1Z/8b/iuwN -K0S4/c7gLH8rB1jQisHomgHano43TzJq8ZFX7wF1E2tnHDdGk+uEZr5C7VPRgrF8 -/DhGGJnw3AoQgD5g1YqFGA5pA0AXr4RF27Y7bKYnzvbktOkfcNhw/4P2rKXWWs1Q -x2xsU3VaTQ== ------END CERTIFICATE----- diff --git a/priv/ssl/client.crt b/priv/ssl/client.crt deleted file mode 100644 index 45df2707..00000000 --- a/priv/ssl/client.crt +++ /dev/null @@ -1,20 +0,0 @@ ------BEGIN CERTIFICATE----- -MIIDPDCCAiQCCQDU7hj/emVKfDANBgkqhkiG9w0BAQsFADBgMQswCQYDVQQGEwJT -RTESMBAGA1UECAwJU3RvY2tob2xtMRIwEAYDVQQHDAlTdG9ja2hvbG0xDTALBgNV -BAoMBGJyb2QxDTALBgNVBAsMBHRlc3QxCzAJBgNVBAMMAiouMB4XDTE2MTEwNDE2 -MTQzNloXDTI2MTEwMjE2MTQzNlowYDELMAkGA1UEBhMCU0UxEjAQBgNVBAgMCVN0 -b2NraG9sbTESMBAGA1UEBwwJU3RvY2tob2xtMQ0wCwYDVQQKDARicm9kMQ0wCwYD -VQQLDAR0ZXN0MQswCQYDVQQDDAIqLjCCASIwDQYJKoZIhvcNAQEBBQADggEPADCC -AQoCggEBAKtN+UzF8g9JRhKXn/rnm4jCr26+HpboDQYxG1HSCwwWskdOMK1b/8w4 -ipNzpoV16teRW5AVdq5Z6DDzBE5X43rrJZ9+x6pd25mVyktmwAIxEYscLtxN1UoL -a5EF13D8UPWCyzylThhUwi67bHvbLeWzAKoccKqdV/5ZNjFnqt9Q9seFOxyXNcFE -/qfUQTfkcL4rei2dgkFPFOXbF2rKRgMaiseyVAJP0G8AcsCkQvaYnkQrJ8nAZBtI -vZmq2og9PW7Z8rEbm9TVLnLNtEE5Lx2S1SQS9QPccYJDAyQJLCOw2ikGQPgtDfbs -KILEp+MChTWgEeb/LBlN/qa+zDraDm0CAwEAATANBgkqhkiG9w0BAQsFAAOCAQEA -EdsizFjP+hWSa5A0UFRIETvAztpTd+pjWWVv3DwCCRc2aMys+GYnR5fkHtnwKr7u -diZ8SSMZQFhlxA9MRNe8++wKeKeCzqrwIV1+mQcGqrJLl6sxW5TcMs/bRy5BPwZJ -RGlcz6HdLY8UBZzY2Qy2A4VecqwNe07Vg+7Yebui4w09pt5045S/q33/arb/LKP+ -1CbCjNyF3QC0aww+YgML+PgjnNtqbO/85qV424/dMX+aNAotQ/zBdEfEXyFaCoAE -yCHA99FnhHsQ9gwv9vhMLAX+yiBIEoh3e18EtmZdsvsTpDd1KI4nrh44TJfEY65+ -fNeAXYygkzsN14bbk9PgMw== ------END CERTIFICATE----- diff --git a/priv/ssl/client.key b/priv/ssl/client.key deleted file mode 100644 index 641967ab..00000000 --- a/priv/ssl/client.key +++ /dev/null @@ -1,28 +0,0 @@ ------BEGIN PRIVATE KEY----- -MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCrTflMxfIPSUYS -l5/655uIwq9uvh6W6A0GMRtR0gsMFrJHTjCtW//MOIqTc6aFderXkVuQFXauWegw -8wROV+N66yWffseqXduZlcpLZsACMRGLHC7cTdVKC2uRBddw/FD1gss8pU4YVMIu -u2x72y3lswCqHHCqnVf+WTYxZ6rfUPbHhTsclzXBRP6n1EE35HC+K3otnYJBTxTl -2xdqykYDGorHslQCT9BvAHLApEL2mJ5EKyfJwGQbSL2ZqtqIPT1u2fKxG5vU1S5y -zbRBOS8dktUkEvUD3HGCQwMkCSwjsNopBkD4LQ327CiCxKfjAoU1oBHm/ywZTf6m -vsw62g5tAgMBAAECggEADFm50Jww4INC5xJBeYB7STfoGA7i+7RNRBYERzjijQOR -5OwxPD52yc2FyC29Yr/mp5YWSOQTQ2y9/dF3jQJvJyyO8NneIV1U+NTA2gDVdRL+ -lc35Xu7JouYB4lnOd5npaFn+tyef4scxnNbscl2SCI6ITLtyMAraDj92VceInUMF -28srCTMdjbhVLpeq80qdeDVnyzlmua1W8pjR1lNXY2IECS9gTp6+JLiMQ0FJlC9V -r+U5iAoqLCNh+QpdM+2Z8kbkKA5PqsWcAhx+dTTkbRPp59r7Qd2xtxde5PGlm6zp -cqXgbWaXCMlbL5C7eOyPfLty3+KPrR6LGW6jGEqioQKBgQDcK2LGx/1PE2Y27p+O -RImN5SYERiRnYen7bm1CBAoH1J5LDxWWcf8Bz8/y4bNvEZJVosvPDRoNilI4RTYD -JiJw/qXio6FG78yIzvCK0WLIPgq6stufdbd/+UsNrDbGTuhk/qti8TSckEEgrUWg -U0NgEc/zyIMQK/4mZSgqeUpuxQKBgQDHLsxRT3Ile4sT2arxv0/KzSmnEL9hCAa9 -Cf+N0mWPrt6rzJUTD0+FBToXGP3k4auKETRb3XHvSHCwwl6Bi+NTPpVYqBT53jEv -fSb6bMjSlZyja+miVh/7TI2on7keus19XtZyks7PKoHa+i4w61zy5jbBdBC/kU1y -8O3HXF4biQKBgQCI6/5o6vTQmZrmrK3TtzHoacqju89l79GoyPrvpD1ss0CiI0Zk -oo5ZXRjQzqZde4sK8MxY/qfmJdCOKBS4Dp46sVMOyH5C9Fy59CBJ5H/PUi4v/41v -9LBiyPFxFlmWKHqEXJDPXnw+pcOrA7caRs3O0CUIUfmYNBPBYwWArJ+qlQKBgFpO -25BaJvTbqNkdLaZiCTl3/9ShgUPrMbLwH5AbvrSAorDeFxEHNhSnpAjo6eSmdPIq -jsTACHJnM8DQv6yY0j7h9zC1NJ19omtXoR6VyA/CibyGpu1VgzabJPc5Q+Os6pJX -N3/HFEFVkn7IQ70mWYQ/4L+hch6JMMZWeliTho+RAoGADcqzTMLtp7kRH8LQcq1n -oCE2FYJPvpd8PWlMCZ0ewSk6CbIgLvwJ+Hw0040m11HlCG7xVNdJV0rAU68Z7txm -pYIXL3D9MlJWCWMjZ7k11fuN1EtPLYYhMgS7FhADdUfFhnRGDkF2LnbvZIh3UtN6 -H5khVwyCU9LwQoxKfTmuxnY= ------END PRIVATE KEY----- diff --git a/rebar.config b/rebar.config index 9f3e53d0..a252dcf6 100644 --- a/rebar.config +++ b/rebar.config @@ -16,9 +16,9 @@ {relx, [{release, {brod, "i"}, % release the interactive shell as brod-i [brod, jsone, docopt]}, {include_erts, true}, - {overlay, [{copy, "scripts/brod", "bin"}, - {copy, "{{lib_dirs}}/crc32cer/priv/crc32cer*.so", "bin"}, - {copy, "{{lib_dirs}}/snappyer/priv/snappyer.so", "bin"} + {overlay, [{copy, "scripts/brod", "bin/"}, + {copy, "{{lib_dirs}}/crc32cer/priv/crc32cer*.so", "bin/"}, + {copy, "{{lib_dirs}}/snappyer/priv/snappyer.so", "bin/"} ]} ]}]}, {test, [ @@ -27,8 +27,8 @@ , {jsone, "1.7.0"} , {meck, "0.9.2"} , {proper, "1.4.0"} - , {snabbkaffe, "1.0.1"} , {snappyer, "1.2.8"} + , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {branch, "1.0.8"}}} ]}, {erl_opts, [warnings_as_errors, {d, build_brod_cli}]} ]} diff --git a/scripts/docker-compose.yml b/scripts/docker-compose.yml index 272f0b4f..45839f99 100644 --- a/scripts/docker-compose.yml +++ b/scripts/docker-compose.yml @@ -1,35 +1,17 @@ version: "2" services: - pause: - image: "gcr.io/google_containers/pause-amd64:3.0" - networks: - - pausenet - ports: - - "2181:2181" - - "9092:9092" - - "9093:9093" - - "9094:9094" - - "9095:9095" - - "9192:9192" - - "9193:9193" - - "9194:9194" - - "9195:9195" - container_name: pause zookeeper: - depends_on: - - pause - image: "zmstone/kafka:${KAFKA_VERSION}" + image: "zmstone/kafka:${KAFKA_IMAGE_VERSION}" container_name: zookeeper command: run zookeeper - network_mode: service:pause + network_mode: host kafka_1: depends_on: - - pause - zookeeper - image: "zmstone/kafka:${KAFKA_VERSION}" + image: "zmstone/kafka:${KAFKA_IMAGE_VERSION}" container_name: "kafka-1" - network_mode: service:pause + network_mode: host environment: BROKER_ID: 0 PLAINTEXT_PORT: 9092 @@ -40,11 +22,10 @@ services: ZOOKEEPER_CONNECT: "localhost:2181" kafka_2: depends_on: - - pause - zookeeper - image: "zmstone/kafka:${KAFKA_VERSION}" + image: "zmstone/kafka:${KAFKA_IMAGE_VERSION}" container_name: "kafka-2" - network_mode: service:pause + network_mode: host environment: BROKER_ID: 1 PLAINTEXT_PORT: 9192 @@ -53,6 +34,3 @@ services: SASL_PLAINTEXT_PORT: 9195 ADVERTISED_HOSTNAME: localhost ZOOKEEPER_CONNECT: "localhost:2181" - -networks: - pausenet: diff --git a/scripts/setup-test-env.sh b/scripts/setup-test-env.sh index b39fe63e..25264fd1 100755 --- a/scripts/setup-test-env.sh +++ b/scripts/setup-test-env.sh @@ -1,5 +1,9 @@ #!/bin/bash -eu +if [ -n "${DEBUG:-}" ]; then + set -x +fi + docker ps > /dev/null || { echo "You must be a member of docker group to run this script" exit 1 @@ -18,46 +22,77 @@ function docker_compose { fi } -VERSION=${KAFKA_VERSION:-1.1} -if [ -z $VERSION ]; then VERSION=$1; fi +KAFKA_VERSION=${KAFKA_VERSION:-3.6} +if [ -z $KAFKA_VERSION ]; then KAFKA_VERSION=$1; fi -case $VERSION in +case $KAFKA_VERSION in + 0.9*) + KAFKA_VERSION="0.9";; 0.10*) - VERSION="0.10";; + KAFKA_VERSION="0.10";; 0.11*) - VERSION="0.11";; + KAFKA_VERSION="0.11";; 1.*) - VERSION="1.1";; + KAFKA_VERSION="1.1";; 2.*) - VERSION="2.4";; + KAFKA_VERSION="2.8";; + 3.*) + KAFKA_VERSION="3.6";; *) - VERSION="2.4";; + KAFKA_VERSION="3.6";; esac -echo "Using KAFKA_VERSION=$VERSION" -export KAFKA_VERSION=$VERSION +export KAFKA_IMAGE_VERSION="1.1-${KAFKA_VERSION}" +echo "env KAFKA_IMAGE_VERSION=$KAFKA_IMAGE_VERSION" TD="$(cd "$(dirname "$0")" && pwd)" docker_compose -f $TD/docker-compose.yml down || true docker_compose -f $TD/docker-compose.yml up -d +if [[ "$KAFKA_VERSION" == 2* ]] || [[ "$KAFKA_VERSION" == 3* ]]; then + MAYBE_ZOOKEEPER="--bootstrap-server localhost:9092" +else + MAYBE_ZOOKEEPER="--zookeeper localhost:2181" +fi -n=0 -while [ "$(docker exec kafka-1 bash -c '/opt/kafka/bin/kafka-topics.sh --zookeeper localhost --list')" != '' ]; do - if [ $n -gt 4 ]; then - echo "timeout waiting for kakfa_1" - exit 1 +TOPIC_LIST_CMD="/opt/kafka/bin/kafka-topics.sh $MAYBE_ZOOKEEPER --list" +MAX_WAIT_SEC=10 + +function wait_for_kafka { + local which_kafka="$1" + local n=0 + local port=':9092' + local topic_list listener + if [ "$which_kafka" = 'kafka-2' ]; then + port=':9192' fi - n=$(( n + 1 )) - sleep 1 -done + while true; do + listener="$(netstat -tnlp 2>&1 | grep $port || true)" + if [ "$listener" != '' ]; then + topic_list="$(docker exec $which_kafka $TOPIC_LIST_CMD 2>&1)" + if [ "${topic_list-}" = '' ]; then + break + fi + fi + if [ $n -gt $MAX_WAIT_SEC ]; then + echo "timeout waiting for kafka-1" + echo "last print: ${topic_list:-}" + exit 1 + fi + n=$(( n + 1 )) + sleep 1 + done +} + +wait_for_kafka kafka-1 +wait_for_kafka kafka-2 function create_topic { TOPIC_NAME="$1" PARTITIONS="${2:-1}" REPLICAS="${3:-1}" - CMD="/opt/kafka/bin/kafka-topics.sh --zookeeper localhost --create --partitions $PARTITIONS --replication-factor $REPLICAS --topic $TOPIC_NAME --config min.insync.replicas=1" + CMD="/opt/kafka/bin/kafka-topics.sh $MAYBE_ZOOKEEPER --create --partitions $PARTITIONS --replication-factor $REPLICAS --topic $TOPIC_NAME --config min.insync.replicas=1" docker exec kafka-1 bash -c "$CMD" } @@ -80,7 +115,7 @@ create_topic "brod_compression_SUITE" create_topic "lz4-test" create_topic "test-topic" -if [[ "$KAFKA_VERSION" = 2* ]]; then +if [[ "$KAFKA_VERSION" = 2* ]] || [[ "$KAFKA_VERSION" = 3* ]]; then MAYBE_NEW_CONSUMER="" else MAYBE_NEW_CONSUMER="--new-consumer" @@ -90,5 +125,5 @@ docker exec kafka-1 /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server l # for kafka 0.11 or later, add sasl-scram test credentials if [[ "$KAFKA_VERSION" != 0.9* ]] && [[ "$KAFKA_VERSION" != 0.10* ]]; then - docker exec kafka-1 /opt/kafka/bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=ecila],SCRAM-SHA-512=[password=ecila]' --entity-type users --entity-name alice + docker exec kafka-1 /opt/kafka/bin/kafka-configs.sh $MAYBE_ZOOKEEPER --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=ecila],SCRAM-SHA-512=[password=ecila]' --entity-type users --entity-name alice fi diff --git a/src/brod_cli.erl b/src/brod_cli.erl index b28b6302..e3f3cef8 100644 --- a/src/brod_cli.erl +++ b/src/brod_cli.erl @@ -61,6 +61,8 @@ commands: %% NOTE: bad indentation at the first line is intended -define(COMMAND_COMMON_OPTIONS, " --ssl Use TLS, validate server using trusted CAs + --ssl-versions= Specify SSL versions. Comma separated versions, + e.g. 1.3,1.2 --cacertfile= Use TLS, validate server using the given certificate --certfile= Client certificate in case client authentication is enabled in brokers @@ -365,6 +367,7 @@ main(Command, Doc, Args, Stop, LogLevel) -> C1 : E1 ?BIND_STACKTRACE(Stack1) -> ?GET_STACKTRACE(Stack1), verbose("~p:~p\n~p\n", [C1, E1, Stack1]), + io:format(user, "~p~n", [{C1, E1, Stack1}]), ?STOP(Stop) end, case LogLevel =:= ?LOG_LEVEL_QUIET of @@ -1125,20 +1128,25 @@ parse_offset_time(T) -> int(T). parse_connection_config(Args) -> SslBool = parse(Args, "--ssl", fun parse_boolean/1), + SslVersions = parse(Args, "--ssl-versions", fun parse_ssl_versions/1), CaCertFile = parse(Args, "--cacertfile", fun parse_file/1), CertFile = parse(Args, "--certfile", fun parse_file/1), KeyFile = parse(Args, "--keyfile", fun parse_file/1), FilterPred = fun({_, V}) -> V =/= ?undef end, SslOpt = - case CaCertFile of - ?undef -> - SslBool; - _ -> - Files = + case SslBool of + true -> + Opts = [{cacertfile, CaCertFile}, {certfile, CertFile}, - {keyfile, KeyFile}], - lists:filter(FilterPred, Files) + {keyfile, KeyFile}, + {versions, SslVersions}, + %% TODO: verify_peer if cacertfile is provided + {verify, verify_none} + ], + lists:filter(FilterPred, Opts); + false -> + false end, SaslPlain = parse(Args, "--sasl-plain", fun parse_file/1), SaslScram256 = parse(Args, "--scram256", fun parse_file/1), @@ -1157,12 +1165,31 @@ parse_boolean(true) -> true; parse_boolean(false) -> false; parse_boolean("true") -> true; parse_boolean("false") -> false; -parse_boolean(?undef) -> ?undef. +parse_boolean(?undef) -> false. parse_cg_ids("") -> []; parse_cg_ids("all") -> all; parse_cg_ids(Str) -> [bin(I) || I <- string:tokens(Str, ",")]. +parse_ssl_versions(?undef) -> + parse_ssl_versions(""); +parse_ssl_versions(Versions) -> + case lists:map(fun parse_ssl_version/1, string:tokens(Versions, ", ")) of + [] -> + ['tlsv1.2']; + Vsns -> + Vsns + end. + +parse_ssl_version("1.2") -> + 'tlsv1.2'; +parse_ssl_version("1.3") -> + 'tlsv1.3'; +parse_ssl_version("1.1") -> + 'tlsv1.1'; +parse_ssl_version(Other) -> + error({unsupported_tls_version, Other}). + parse_file(?undef) -> ?undef; parse_file(Path) -> diff --git a/src/brod_consumer.erl b/src/brod_consumer.erl index 37cb1756..a4c39d5c 100644 --- a/src/brod_consumer.erl +++ b/src/brod_consumer.erl @@ -462,7 +462,7 @@ handle_fetch_response(#kpro_rsp{ref = Ref1}, } = State) when Ref1 =/= Ref2 -> %% Not expected response, discard {noreply, State}; -handle_fetch_response(#kpro_rsp{ref = Ref} = Rsp, +handle_fetch_response(#kpro_rsp{ref = Ref, vsn = Vsn} = Rsp, #state{ topic = Topic , partition = Partition , last_req_ref = Ref @@ -472,7 +472,7 @@ handle_fetch_response(#kpro_rsp{ref = Ref} = Rsp, {ok, #{ header := Header , batches := Batches }} -> - handle_batches(Header, Batches, State); + handle_batches(Header, Batches, State, Vsn); {error, ErrorCode} -> Error = #kafka_fetch_error{ topic = Topic , partition = Partition @@ -481,7 +481,7 @@ handle_fetch_response(#kpro_rsp{ref = Ref} = Rsp, handle_fetch_error(Error, State) end. -handle_batches(?undef, [], #state{} = State0) -> +handle_batches(?undef, [], #state{} = State0, _Vsn) -> %% It is only possible to end up here in a incremental %% fetch session, empty fetch response implies no %% new messages to fetch, and no changes in partition @@ -491,25 +491,38 @@ handle_batches(?undef, [], #state{} = State0) -> State = maybe_delay_fetch_request(State0), {noreply, State}; handle_batches(_Header, ?incomplete_batch(Size), - #state{max_bytes = MaxBytes} = State0) -> + #state{max_bytes = MaxBytes} = State0, _Vsn) -> %% max_bytes is too small to fetch ONE complete batch true = Size > MaxBytes, %% assert State1 = State0#state{max_bytes = Size}, State = maybe_send_fetch_request(State1), {noreply, State}; -handle_batches(Header, [], #state{begin_offset = BeginOffset} = State0) -> +handle_batches(Header, [], #state{begin_offset = BeginOffset} = State0, Vsn) -> StableOffset = brod_utils:get_stable_offset(Header), State = case BeginOffset < StableOffset of - true -> - %% There are chances that kafka may return empty message set + true when Vsn > 0 -> + %% There are chances that Kafka may return empty message set %% when messages are deleted from a compacted topic. %% Since there is no way to know how big the 'hole' is %% we can only bump begin_offset with +1 and try again. + ?BROD_LOG_WARNING("~s-~p empty_batch_detected_at_offset=~p, " + "fetch_api_vsn=~p, skip_to_offset=~p", + [State0#state.topic, + State0#state.partition, + BeginOffset, + BeginOffset + 1 + ]), State1 = State0#state{begin_offset = BeginOffset + 1}, maybe_send_fetch_request(State1); + true -> + %% Fetch API v0 (Kafka 0.9 and 0.10) seems to have a race condition: + %% Kafka returns empty batch even if BeginOffset is lower than high-watermark + %% if fetch request is sent in a tight loop + %% Retry seems to resolve the issue + maybe_delay_fetch_request(State0); false -> - %% we have either reached the end of a partition + %% We have either reached the end of a partition %% or trying to read uncommitted messages %% try to poll again (maybe after a delay) maybe_delay_fetch_request(State0) @@ -521,7 +534,7 @@ handle_batches(Header, Batches, , begin_offset = BeginOffset , topic = Topic , partition = Partition - } = State0) -> + } = State0, _Vsn) -> StableOffset = brod_utils:get_stable_offset(Header), {NewBeginOffset, Messages} = brod_utils:flatten_batches(BeginOffset, Header, Batches), @@ -721,6 +734,8 @@ send_fetch_request(#state{ begin_offset = BeginOffset } = State) -> (is_integer(BeginOffset) andalso BeginOffset >= 0) orelse erlang:error({bad_begin_offset, BeginOffset}), + %% MaxBytes=0 will make no progress when it's Kafka 0.9 + MaxBytes = max(12, State#state.max_bytes), Request = brod_kafka_request:fetch(Connection, State#state.topic, @@ -728,7 +743,7 @@ send_fetch_request(#state{ begin_offset = BeginOffset State#state.begin_offset, State#state.max_wait_time, State#state.min_bytes, - State#state.max_bytes, + MaxBytes, State#state.isolation_level), case kpro:request_async(Connection, Request) of ok -> diff --git a/src/brod_group_coordinator.erl b/src/brod_group_coordinator.erl index 5ae1ac9c..50884d1a 100644 --- a/src/brod_group_coordinator.erl +++ b/src/brod_group_coordinator.erl @@ -526,7 +526,11 @@ stabilize(#state{ rejoin_delay_seconds = RejoinDelaySeconds State3 = State2#state{is_in_group = false}, %$ 4. Clean up state based on the last failure reason - State = maybe_reset_member_id(State3, Reason), + State4 = maybe_reset_member_id(State3, Reason), + + %% 5. Clean up ongoing heartbeat request ref if connection + %% was closed + State = maybe_reset_hb_ref(State4, Reason), %% 5. ensure we have a connection to the (maybe new) group coordinator F1 = fun discover_coordinator/1, @@ -591,6 +595,15 @@ should_reset_member_id({connection_down, _Reason}) -> should_reset_member_id(_) -> false. +%% When connection goes down while waiting for heartbeat +%% response, the response will never be received. +%% Reset heartbeat ref to let new heartbeat request to +%% be sent over new connection. +maybe_reset_hb_ref(State, {connection_down, _Reason}) -> + State#state{hb_ref = ?undef}; +maybe_reset_hb_ref(State, _) -> + State. + -spec join_group(state()) -> {ok, state()}. join_group(#state{ groupId = GroupId , memberId = MemberId0 diff --git a/src/brod_group_subscriber_worker.erl b/src/brod_group_subscriber_worker.erl index 4f0c40ae..7dd69f9f 100644 --- a/src/brod_group_subscriber_worker.erl +++ b/src/brod_group_subscriber_worker.erl @@ -22,7 +22,7 @@ -include("brod_int.hrl"). %% brod_topic_subscriber callbacks --export([init/2, handle_message/3, terminate/2]). +-export([init/2, handle_message/3, handle_info/2, terminate/2]). -type start_options() :: #{ group_id := brod:group_id() @@ -91,6 +91,15 @@ handle_message(_Partition, Msg, State) -> {ok, NewState} end. +handle_info(Info, #state{cb_module = CbModule , cb_state = CbState} = State) -> + %% Any unhandled messages are forwarded to the callback module to + %% support arbitrary message-passing. + %% Only the {noreply, State} return value is supported. + case brod_utils:optional_callback(CbModule, handle_info, [Info, CbState], {noreply, CbState}) of + {noreply, NewCbState} -> + {noreply, State#state{cb_state = NewCbState}} + end. + terminate(Reason, #state{cb_module = CbModule, cb_state = State}) -> brod_utils:optional_callback(CbModule, terminate, [Reason, State], ok). diff --git a/src/brod_topic_subscriber.erl b/src/brod_topic_subscriber.erl index 08f6512d..9b8add98 100644 --- a/src/brod_topic_subscriber.erl +++ b/src/brod_topic_subscriber.erl @@ -108,7 +108,12 @@ %% This callback is called before stopping the subscriber -callback terminate(_Reason, cb_state()) -> _. --optional_callbacks([terminate/2]). +%% This callback is called when the subscriber receives a message unrelated to +%% the subscription. +%% The callback must return `{noreply, NewCallbackState}'. +-callback handle_info(_Msg, cb_state()) -> {noreply, cb_state()}. + +-optional_callbacks([terminate/2, handle_info/2]). %%%_* Types and macros ========================================================= @@ -357,8 +362,14 @@ handle_info({'DOWN', _Mref, process, Pid, Reason}, %% not a consumer pid {noreply, State} end; -handle_info(_Info, State) -> - {noreply, State}. +handle_info(Info, #state{cb_module = CbModule, cb_state = CbState} = State) -> + %% Any unhandled messages are forwarded to the callback module to + %% support arbitrary message-passing. + %% Only the {noreply, State} return value is supported. + case brod_utils:optional_callback(CbModule, handle_info, [Info, CbState], {noreply, CbState}) of + {noreply, NewCbState} -> + {noreply, State#state{cb_state = NewCbState}} + end. %% @private handle_call(Call, _From, State) -> diff --git a/src/brod_utils.erl b/src/brod_utils.erl index 61113519..f7732a1a 100644 --- a/src/brod_utils.erl +++ b/src/brod_utils.erl @@ -453,12 +453,25 @@ fetch(Conn, ReqFun, Offset, MaxBytes) -> fetch(Conn, ReqFun, Offset, Size); {ok, #{header := Header, batches := Batches}} -> StableOffset = get_stable_offset(Header), - {NewBeginOffset, Msgs} = flatten_batches(Offset, Header, Batches), + {NewBeginOffset0, Msgs} = flatten_batches(Offset, Header, Batches), case Offset < StableOffset andalso Msgs =:= [] of true -> - %% Not reached the latest stable offset yet, - %% but received an empty batch-set (all messages are dropped). - %% try again with new begin-offset + NewBeginOffset = + case NewBeginOffset0 > Offset of + true -> + %% Not reached the latest stable offset yet, + %% but resulted in an empty batch-set, + %% i.e. all messages are dropped due to they are before + %% the last fetch Offset. + %% try again with new begin-offset. + NewBeginOffset0; + false when NewBeginOffset0 =:= Offset -> + %% There are chances that Kafka may return empty message set + %% when messages are deleted from a compacted topic. + %% Since there is no way to know how big the 'hole' is + %% we can only bump begin_offset with +1 and try again. + NewBeginOffset0 + 1 + end, fetch(Conn, ReqFun, NewBeginOffset, MaxBytes); false -> {ok, {StableOffset, Msgs}} diff --git a/test/brod_SUITE.erl b/test/brod_SUITE.erl index 0ea476be..a7f0bb51 100644 --- a/test/brod_SUITE.erl +++ b/test/brod_SUITE.erl @@ -41,10 +41,11 @@ suite() -> [{timetrap, {minutes, 5}}]. init_per_suite(Config) -> - case os:getenv("KAFKA_VERSION") of - "0.9" -> {skip, - "The given Kafka test image does not have support for these apis"}; - _ -> Config + case kafka_test_helper:kafka_version() of + {0, 9} -> + {skip, "no_topic_manaegment_apis"}; + _ -> + Config end. end_per_suite(_Config) -> diff --git a/test/brod_cli_tests.erl b/test/brod_cli_tests.erl index a9615283..014c9e1f 100644 --- a/test/brod_cli_tests.erl +++ b/test/brod_cli_tests.erl @@ -37,9 +37,12 @@ meta_test() -> ssl_test() -> run(["meta", "-b", "localhost:9093", "-L", - "--cacertfile", "priv/ssl/ca.crt", - "--keyfile", "priv/ssl/client.key", - "--certfile", "priv/ssl/client.crt"]). + "--ssl", + "--cacertfile", "test/data/ssl/ca.pem", + "--keyfile", "test/data/ssl/client-key.pem", + "--certfile", "test/data/ssl/client-crt.pem", + "--ssl-versions", "1.2,1.1" + ]). offset_test() -> Args = ["offset", "-b", "localhost", "-t", "test-topic", "-p", "0"], @@ -74,9 +77,10 @@ test_sasl() -> Output = cmd(["send", "--brokers", "localhost:9194,localhost:9094", "-t", "test-topic", "-p", "0", - "--cacertfile", "priv/ssl/ca.crt", - "--keyfile", "priv/ssl/client.key", - "--certfile", "priv/ssl/client.crt", + "--ssl", + "--cacertfile", "test/data/ssl/ca.pem", + "--keyfile", "test/data/ssl/client-key.pem", + "--certfile", "test/data/ssl/client-crt.pem", "--sasl-plain", "sasl.testdata", "-k", K, "-v", V]), ?assertEqual(<<"">>, Output), @@ -176,7 +180,13 @@ get_kafka_version() -> {list_to_integer(Major), list_to_integer(Minor)} end. -run(Args) -> +run(Args0) -> + Args = case kafka_test_helper:kafka_version() of + {0, Minor} when Minor < 11 -> + Args0 ++ ["--no-api-vsn-query"]; + _ -> + Args0 + end, _ = cmd(Args), ok. diff --git a/test/brod_client_SUITE.erl b/test/brod_client_SUITE.erl index dfef00aa..eff92257 100644 --- a/test/brod_client_SUITE.erl +++ b/test/brod_client_SUITE.erl @@ -385,11 +385,13 @@ t_magic_version(Config) when is_list(Config) -> auth(_Host, _Sock, _Mod, _ClientId, _Timeout, _Opts) -> ok. ssl_options() -> - PrivDir = code:priv_dir(brod), - Fname = fun(Name) -> filename:join([PrivDir, ssl, Name]) end, - [ {cacertfile, Fname("ca.crt")} - , {keyfile, Fname("client.key")} - , {certfile, Fname("client.crt")} + LibDir = code:lib_dir(brod), + Fname = fun(Name) -> filename:join([LibDir, test, data, ssl, Name]) end, + [ {cacertfile, Fname("ca.pem")} + , {keyfile, Fname("client-key.pem")} + , {certfile, Fname("client-crt.pem")} + , {versions, ['tlsv1.2']} + , {verify, verify_none} ]. produce_and_consume_message(Host, Client, ClientConfig) -> diff --git a/test/brod_compression_SUITE.erl b/test/brod_compression_SUITE.erl index ac8b9122..b51540b9 100644 --- a/test/brod_compression_SUITE.erl +++ b/test/brod_compression_SUITE.erl @@ -201,10 +201,7 @@ start_client(Hosts, ClientId) -> brod:start_client(Hosts, ClientId, Config). client_config() -> - case os:getenv("KAFKA_VERSION") of - "0.9" ++ _ -> [{query_api_versions, false}]; - _ -> [] - end. + kafka_test_helper:client_config(). %%%_* Emacs ==================================================================== %%% Local Variables: diff --git a/test/brod_consumer_SUITE.erl b/test/brod_consumer_SUITE.erl index 4c60a4a0..93774d1a 100644 --- a/test/brod_consumer_SUITE.erl +++ b/test/brod_consumer_SUITE.erl @@ -76,6 +76,17 @@ end end()). +-define(RETRY(EXPR, Timeout), + retry( + fun() -> + case EXPR of + {ok, R} -> + {ok, R}; + {error, _} -> + false + end + end, Timeout)). + %%%_* ct callbacks ============================================================= suite() -> [{timetrap, {seconds, 30}}]. @@ -104,11 +115,7 @@ init_per_testcase(Case, Config0) -> end, ClientConfig1 = proplists:get_value(client_config, Config, []), brod:stop_client(Client), - ClientConfig = - case os:getenv("KAFKA_VERSION") of - "0.9" ++ _ -> [{query_api_versions, false}]; - _ -> [] - end ++ ClientConfig0 ++ ClientConfig1, + ClientConfig = kafka_test_helper:client_config() ++ ClientConfig0 ++ ClientConfig1, ok = brod:start_client(?HOSTS, Client, ClientConfig), ok = brod:start_producer(Client, Topic, ProducerConfig), try ?MODULE:Case(standalone_consumer) of @@ -140,12 +147,21 @@ end_per_testcase(Case, Config) -> ok end. -all() -> [F || {F, _A} <- module_info(exports), +all() -> + Cases = [F || {F, _A} <- module_info(exports), case atom_to_list(F) of "t_" ++ _ -> true; _ -> false - end]. - + end], + Filter = fun(Case) -> + try + ?MODULE:Case(kafka_version_match) + catch + _:_ -> + true + end + end, + lists:filter(Filter, Cases). %%%_* Test functions =========================================================== @@ -153,14 +169,11 @@ all() -> [F || {F, _A} <- module_info(exports), %% messages fetched back should only contain the committed message %% i.e. aborted messages (testing with isolation_level=read_committed) %% should be dropped, control messages (transaction abort) should be dropped +t_drop_aborted(kafka_version_match) -> + has_txn(); t_drop_aborted(Config) when is_list(Config) -> - case has_txn() of - true -> - test_drop_aborted(Config, true), - test_drop_aborted(Config, false); - false -> - ok - end. + test_drop_aborted(Config, true), + test_drop_aborted(Config, false). %% When QueryApiVsn is set to false, %% brod will use lowest supported API version. @@ -173,7 +186,7 @@ test_drop_aborted(Config, QueryApiVsn) -> fun(CommitOrAbort) -> TxnId = make_transactional_id(), {ok, Conn} = connect_txn_coordinator(TxnId, ?config(client_config)), - {ok, TxnCtx} = kpro:txn_init_ctx(Conn, TxnId), + {ok, TxnCtx} = ?RETRY(kpro:txn_init_ctx(Conn, TxnId), 10), ok = kpro:txn_send_partitions(TxnCtx, [{Topic, Partition}]), Key = bin([atom_to_list(CommitOrAbort), "-", make_unique_key()]), Vsn = 3, %% lowest API version which supports transactional produce @@ -217,11 +230,10 @@ test_drop_aborted(Config, QueryApiVsn) -> ], Msgs) end. +t_wait_for_unstable_offsets(kafka_version_match) -> + has_txn(); t_wait_for_unstable_offsets(Config) when is_list(Config) -> - case has_txn() of - true -> t_wait_for_unstable_offsets({run, Config}); - false -> ok - end; + t_wait_for_unstable_offsets({run, Config}); t_wait_for_unstable_offsets({run, Config}) -> Client = ?config(client), Topic = ?TOPIC, @@ -230,7 +242,7 @@ t_wait_for_unstable_offsets({run, Config}) -> {ok, Conn} = connect_txn_coordinator(TxnId, ?config(client_config)), %% ensure we have enough time to test before expire TxnOpts = #{txn_timeout => timer:seconds(30)}, - {ok, TxnCtx} = kpro:txn_init_ctx(Conn, TxnId, TxnOpts), + {ok, TxnCtx} = ?RETRY(kpro:txn_init_ctx(Conn, TxnId, TxnOpts), 10), ok = kpro:txn_send_partitions(TxnCtx, [{Topic, Partition}]), %% Send one message in this transaction, return the offset in kafka ProduceFun = @@ -278,13 +290,10 @@ t_wait_for_unstable_offsets({run, Config}) -> %% Produce large(-ish) transactional batches, then abort them all %% try fetch from offsets in the middle of large batches, %% expect no delivery of any aborted batches. +t_fetch_aborted_from_the_middle(kafka_version_match) -> + has_txn(); t_fetch_aborted_from_the_middle(Config) when is_list(Config) -> - case has_txn() of - true -> - test_fetch_aborted_from_the_middle(Config); - false -> - ok - end. + test_fetch_aborted_from_the_middle(Config). test_fetch_aborted_from_the_middle(Config) when is_list(Config) -> Client = ?config(client), @@ -292,7 +301,7 @@ test_fetch_aborted_from_the_middle(Config) when is_list(Config) -> Partition = 0, TxnId = make_transactional_id(), {ok, Conn} = connect_txn_coordinator(TxnId, ?config(client_config)), - {ok, TxnCtx} = kpro:txn_init_ctx(Conn, TxnId), + {ok, TxnCtx} = ?RETRY(kpro:txn_init_ctx(Conn, TxnId), 10), ok = kpro:txn_send_partitions(TxnCtx, [{Topic, Partition}]), %% make a large-ish message MkMsg = fun(Key) -> @@ -413,6 +422,12 @@ t_fold(Config) when is_list(Config) -> 0, ErrorFoldF, #{})), ok. +%% This test case does not work with Kafka 0.9, not sure aobut 0.10 and 0.11 +%% since all 0.x versions are old enough, we only try to verify this against +%% 1.x or newer +t_direct_fetch_with_small_max_bytes(kafka_version_match) -> + {Major, _Minor} = kafka_test_helper:kafka_version(), + Major > 1; t_direct_fetch_with_small_max_bytes(Config) when is_list(Config) -> Client = ?config(client), Topic = ?TOPIC, @@ -428,6 +443,11 @@ t_direct_fetch_with_small_max_bytes(Config) when is_list(Config) -> ?assertEqual(Key, Msg#kafka_message.key), ok. +%% Starting from version 3, Kafka no longer returns incomplete batch +%% for Fetch request v0, cannot test max_bytes expansion anymore. +t_direct_fetch_expand_max_bytes(kafka_version_match) -> + {Major, _Minor} = kafka_test_helper:kafka_version(), + Major < 3; t_direct_fetch_expand_max_bytes({init, Config}) when is_list(Config) -> %% kafka returns empty message set when it's 0.9 %% or when fetch request sent was version 0 @@ -441,7 +461,7 @@ t_direct_fetch_expand_max_bytes(Config) when is_list(Config) -> Value = crypto:strong_rand_bytes(100), ok = brod:produce_sync(Client, ?TOPIC, Partition, Key, Value), {ok, Offset} = brod:resolve_offset(?HOSTS, Topic, Partition, - ?OFFSET_LATEST, ?config(client_config)), + ?OFFSET_LATEST, ?config(client_config)), {ok, {_, [Msg]}} = brod:fetch({?HOSTS, ?config(client_config)}, Topic, Partition, Offset - 1, #{max_bytes => 13}), @@ -450,6 +470,9 @@ t_direct_fetch_expand_max_bytes(Config) when is_list(Config) -> %% @doc Consumer should be smart enough to try greater max_bytes %% when it's not great enough to fetch one single message +t_consumer_max_bytes_too_small(kafka_version_match) -> + {Major, _Minor} = kafka_test_helper:kafka_version(), + Major < 3; t_consumer_max_bytes_too_small({init, Config}) -> meck:new(brod_kafka_request, [passthrough, no_passthrough_cover, no_history]), %% kafka returns empty message set when it's 0.9 @@ -464,9 +487,8 @@ t_consumer_max_bytes_too_small(Config) -> brod:unsubscribe(Client, ?TOPIC, Partition), Key = make_unique_key(), ValueBytes = 2000, - MaxBytes1 = 8, %% too small for even the header - MaxBytes2 = 12, %% too small but message size is fetched - MaxBytes3 = size(Key) + ValueBytes, + MaxBytes1 = 12, %% too small for even the header + MaxBytes2 = size(Key) + ValueBytes, Tester = self(), F = fun(Conn, Topic, Partition1, BeginOffset, MaxWait, MinBytes, MaxBytes, IsolationLevel) -> @@ -482,8 +504,7 @@ t_consumer_max_bytes_too_small(Config) -> brod:subscribe(Client, self(), ?TOPIC, Partition, Options), ok = brod:produce_sync(Client, ?TOPIC, Partition, Key, Value), ok = wait_for_max_bytes_sequence([{'=', MaxBytes1}, - {'=', MaxBytes2}, - {'>', MaxBytes3}], + {'>', MaxBytes2}], _TriedCount = 0), ?WAIT_ONLY( {ConsumerPid, #kafka_message_set{messages = Messages}}, @@ -843,7 +864,9 @@ wait_for_max_bytes_sequence([{Compare, MaxBytes} | Rest] = Waiting, Cnt) -> wait_for_max_bytes_sequence(Waiting, Cnt + 1); _ -> ct:fail("unexpected ~p, expecting ~p", [Bytes, {Compare, MaxBytes}]) - end + end; + Other -> + error(Other) after 3000 -> ct:fail("timeout", []) @@ -875,13 +898,6 @@ connect_txn_coordinator(TxnId, Config, RetriesLeft, _LastError) -> connect_txn_coordinator(TxnId, Config, RetriesLeft - 1, Reason) end. -has_txn() -> - case os:getenv("KAFKA_VERSION") of - "0.9" ++ _ -> false; - "0.10" ++ _ -> false; - _ -> true - end. - consumer_config() -> [{max_wait_time, 1000}, {sleep_timeout, 10}]. retry(_F, 0) -> error(timeout); @@ -901,6 +917,14 @@ wait_for_consumer_connection(Consumer, OldConn) -> end, retry(F, 5). +has_txn() -> + case kafka_test_helper:kafka_version() of + {0, Minor} when Minor < 11 -> + false; + _ -> + true + end. + %%%_* Emacs ==================================================================== %%% Local Variables: %%% allout-layout: t diff --git a/test/brod_demo_group_subscriber_koc.erl b/test/brod_demo_group_subscriber_koc.erl index 49407ea6..5680c780 100644 --- a/test/brod_demo_group_subscriber_koc.erl +++ b/test/brod_demo_group_subscriber_koc.erl @@ -218,10 +218,7 @@ os_time_utc_str() -> lists:flatten(S). client_config() -> - case os:getenv("KAFKA_VERSION") of - "0.9" ++ _ -> [{query_api_versions, false}]; - _ -> [] - end. + kafka_test_helper:client_config(). %%%_* Emacs ==================================================================== %%% Local Variables: diff --git a/test/brod_demo_group_subscriber_loc.erl b/test/brod_demo_group_subscriber_loc.erl index 39f5d543..b9f7539e 100644 --- a/test/brod_demo_group_subscriber_loc.erl +++ b/test/brod_demo_group_subscriber_loc.erl @@ -235,10 +235,7 @@ os_time_utc_str() -> lists:flatten(S). client_config() -> - case os:getenv("KAFKA_VERSION") of - "0.9" ++ _ -> [{query_api_versions, false}]; - _ -> [] - end. + kafka_test_helper:client_config(). %%%_* Emacs ==================================================================== %%% Local Variables: diff --git a/test/brod_demo_topic_subscriber.erl b/test/brod_demo_topic_subscriber.erl index 77de8500..e49d5b73 100644 --- a/test/brod_demo_topic_subscriber.erl +++ b/test/brod_demo_topic_subscriber.erl @@ -177,10 +177,7 @@ os_time_utc_str() -> lists:flatten(S). client_config() -> - case os:getenv("KAFKA_VERSION") of - "0.9" ++ _ -> [{query_api_versions, false}]; - _ -> [] - end. + kafka_test_helper:client_config(). commit_dir(Topic) -> filename:join(["/tmp", Topic]). diff --git a/test/brod_group_coordinator_SUITE.erl b/test/brod_group_coordinator_SUITE.erl index ace01079..01a3e47f 100644 --- a/test/brod_group_coordinator_SUITE.erl +++ b/test/brod_group_coordinator_SUITE.erl @@ -73,10 +73,7 @@ common_end_per_testcase(_Case, Config) when is_list(Config) -> ok = application:stop(brod). client_config() -> - case os:getenv("KAFKA_VERSION") of - "0.9" ++ _ -> [{query_api_versions, false}]; - _ -> [] - end. + kafka_test_helper:client_config(). %%%_* Group coordinator callbacks ============================================== diff --git a/test/brod_group_subscriber_SUITE.erl b/test/brod_group_subscriber_SUITE.erl index d55dea72..7ad69d14 100644 --- a/test/brod_group_subscriber_SUITE.erl +++ b/test/brod_group_subscriber_SUITE.erl @@ -550,6 +550,7 @@ t_async_commit({init, Config}) -> integer_to_list(rand:uniform(1000000000))), [{group_id, GroupId} | Config]; t_async_commit(Config) when is_list(Config) -> + GroupId = ?config(group_id), Behavior = ?config(behavior), Topic = ?topic, Partition = 0, @@ -562,7 +563,7 @@ t_async_commit(Config) when is_list(Config) -> , {partition_restart_delay_seconds, 1} , {begin_offset, Offset} ], - {ok, SubscriberPid} = start_subscriber(?group_id, Config, [Topic], + {ok, SubscriberPid} = start_subscriber(GroupId, Config, [Topic], GroupConfig, ConsumerConfig, InitArgs), SubscriberPid diff --git a/test/brod_offset_txn_SUITE.erl b/test/brod_offset_txn_SUITE.erl index 5565a8a4..aee57168 100644 --- a/test/brod_offset_txn_SUITE.erl +++ b/test/brod_offset_txn_SUITE.erl @@ -36,8 +36,13 @@ suite() -> [{timetrap, {seconds, 30}}]. init_per_suite(Config) -> - {ok, _} = application:ensure_all_started(brod), - Config. + case kafka_test_helper:kafka_version() of + {0, Minor} when Minor < 11 -> + {skip, "no_transaction"}; + _ -> + {ok, _} = application:ensure_all_started(brod), + Config + end. end_per_suite(_Config) -> ok. @@ -72,17 +77,14 @@ end_per_testcase(_Case, Config) -> end, Config. -all() -> [F || {F, _A} <- module_info(exports), +all() -> + [F || {F, _A} <- module_info(exports), case atom_to_list(F) of "t_" ++ _ -> true; _ -> false end]. -client_config() -> - case os:getenv("KAFKA_VERSION") of - "0.9" ++ _ -> [{query_api_versions, false}]; - _ -> [] - end. +client_config() -> kafka_test_helper:client_config(). init(GroupId, #{ client := Client diff --git a/test/brod_producer_SUITE.erl b/test/brod_producer_SUITE.erl index 558340f4..40004da3 100644 --- a/test/brod_producer_SUITE.erl +++ b/test/brod_producer_SUITE.erl @@ -452,10 +452,7 @@ t_configure_produce_api_vsn(Config) when is_list(Config) -> %%%_* Help functions =========================================================== client_config() -> - case os:getenv("KAFKA_VERSION") of - "0.9" ++ _ -> [{query_api_versions, false}]; - _ -> [] - end. + kafka_test_helper:client_config(). %% os:timestamp should be unique enough for testing make_unique_kv() -> diff --git a/test/brod_topic_subscriber_SUITE.erl b/test/brod_topic_subscriber_SUITE.erl index 9c022a9d..db61189d 100644 --- a/test/brod_topic_subscriber_SUITE.erl +++ b/test/brod_topic_subscriber_SUITE.erl @@ -30,6 +30,7 @@ -export([ init/2 , terminate/2 , handle_message/3 + , handle_info/2 ]). %% Test cases @@ -40,6 +41,7 @@ , t_callback_crash/1 , t_begin_offset/1 , t_cb_fun/1 + , t_consumer_ack_via_message_passing/1 ]). -include("brod_test_setup.hrl"). @@ -107,6 +109,21 @@ handle_message(Partition, Message, #state{ is_async_ack = IsAsyncAck false -> {ok, ack, State} end. +handle_info({ack_offset, Partition, Offset} = Msg, #state{ counter = Counter + , worker_id = Ref + } = State0) -> + %% Participate in state continuity checks + ?tp(topic_subscriber_seen_info, + #{ partition => Partition + , offset => Offset + , msg => Msg + , state => Counter + , worker_id => Ref + }), + State = State0#state{counter = Counter + 1}, + ok = brod_topic_subscriber:ack(self(), Partition, Offset), + {noreply, State}. + terminate(Reason, #state{worker_id = Ref, counter = Counter}) -> ?tp(topic_subscriber_terminate, #{ worker_id => Ref @@ -184,6 +201,44 @@ t_async_acks(Config) when is_list(Config) -> check_init_terminate(Trace) end). +t_consumer_ack_via_message_passing(Config) when is_list(Config) -> + %% Process messages one by one with no prefetch + ConsumerConfig = [ {prefetch_count, 0} + , {prefetch_bytes, 0} + , {sleep_timeout, 0} + , {max_bytes, 0} + ], + Partition = 0, + SendFun = + fun(I) -> + produce({?topic, Partition}, integer_to_binary(I)) + end, + ?check_trace( + %% Run stage: + begin + O0 = SendFun(0), + %% Send two messages + Offset0 = SendFun(1), + _Offset1 = SendFun(2), + InitArgs = {_IsAsyncAck = true, + _ConsumerOffsets = [{0, O0}]}, + {ok, SubscriberPid} = + brod:start_link_topic_subscriber(?CLIENT_ID, ?topic, ConsumerConfig, + ?MODULE, InitArgs), + {ok, _} = wait_message(<<"1">>), + %% ack_offset allows consumer to proceed to message 2 + SubscriberPid ! {ack_offset, 0, Offset0}, + {ok, _} = wait_message(<<"2">>), + ok = brod_topic_subscriber:stop(SubscriberPid), + _Expected = [<<"1">>, <<"2">>] + end, + %% Check stage: + fun(Expected, Trace) -> + check_received_messages(Expected, Trace), + check_state_continuity(Trace), + check_init_terminate(Trace) + end). + t_begin_offset(Config) when is_list(Config) -> ConsumerConfig = [ {prefetch_count, 100} , {prefetch_bytes, 0} %% as discard diff --git a/test/brod_txn_SUITE.erl b/test/brod_txn_SUITE.erl index b5fb9294..8892695c 100644 --- a/test/brod_txn_SUITE.erl +++ b/test/brod_txn_SUITE.erl @@ -29,8 +29,13 @@ suite() -> [{timetrap, {seconds, 30}}]. init_per_suite(Config) -> - {ok, _} = application:ensure_all_started(brod), - Config. + case kafka_test_helper:kafka_version() of + {0, Minor} when Minor < 11 -> + {skip, "no_transaction"}; + _ -> + {ok, _} = application:ensure_all_started(brod), + Config + end. end_per_suite(_Config) -> ok. @@ -80,10 +85,7 @@ all() -> [F || {F, _A} <- module_info(exports), end]. client_config() -> - case os:getenv("KAFKA_VERSION") of - "0.9" ++ _ -> [{query_api_versions, false}]; - _ -> [] - end. + []. subscriber_loop(TesterPid) -> receive diff --git a/test/brod_txn_processor_SUITE.erl b/test/brod_txn_processor_SUITE.erl index cc2d5e3d..29ee0e3c 100644 --- a/test/brod_txn_processor_SUITE.erl +++ b/test/brod_txn_processor_SUITE.erl @@ -35,8 +35,13 @@ suite() -> [{timetrap, {seconds, 30}}]. init_per_suite(Config) -> - {ok, _} = application:ensure_all_started(brod), - Config. + case kafka_test_helper:kafka_version() of + {0, Minor} when Minor < 11 -> + {skip, "no_transaction"}; + _ -> + {ok, _} = application:ensure_all_started(brod), + Config + end. end_per_suite(_Config) -> ok. @@ -65,11 +70,7 @@ all() -> [F || {F, _A} <- module_info(exports), _ -> false end]. -client_config() -> - case os:getenv("KAFKA_VERSION") of - "0.9" ++ _ -> [{query_api_versions, false}]; - _ -> [] - end. +client_config() -> kafka_test_helper:client_config(). rand() -> iolist_to_binary([base64:encode(crypto:strong_rand_bytes(8))]). diff --git a/test/data/ssl/README.md b/test/data/ssl/README.md new file mode 100644 index 00000000..70880474 --- /dev/null +++ b/test/data/ssl/README.md @@ -0,0 +1,3 @@ +This dir holds files for TLS/SSL tests. +The files are copied from Kafka docker image in the `make test-env` step. +See how the docker image is built here: https://github.com/zmstone/docker-kafka diff --git a/test/kafka_test_helper.erl b/test/kafka_test_helper.erl index 56ac9b4f..fd6f0ce3 100644 --- a/test/kafka_test_helper.erl +++ b/test/kafka_test_helper.erl @@ -17,6 +17,7 @@ , client_config/0 , bootstrap_hosts/0 , kill_process/2 + , kafka_version/0 ]). -include("brod_test_macros.hrl"). @@ -82,9 +83,30 @@ produce_payloads(Topic, Partition, Config) -> {LastOffset, Payloads}. client_config() -> - case os:getenv("KAFKA_VERSION") of - "0.9" ++ _ -> [{query_api_versions, false}]; - _ -> [] + case kafka_version() of + {0, 9} -> [{query_api_versions, false}]; + _ -> [] + end. + +maybe_zookeeper() -> + case kafka_version() of + {3, _} -> + %% Kafka 2.2 started supporting --bootstap-server, but 2.x still supports --zookeeper + %% Starting from 3.0, --zookeeper is no longer supported, must use --bootstrap-server + "--bootstrap-server localhost:9092"; + _ -> + "--zookeeper localhost:2181" + end. + +kafka_version() -> + VsnStr = os:getenv("KAFKA_VERSION"), + case VsnStr =:= "" orelse VsnStr =:= false of + true -> + ct:pal("KAFKA_VERSION is not set, defaulting to 3.6", []), + {3, 6}; + false -> + [Major, Minor | _] = string:tokens(VsnStr, "."), + {list_to_integer(Major), list_to_integer(Minor)} end. prepare_topic(Topic) when is_binary(Topic) -> @@ -97,12 +119,12 @@ prepare_topic({Topic, NumPartitions, NumReplicas}) -> ok = brod:start_producer(?TEST_CLIENT_ID, Topic, _ProducerConfig = []). delete_topic(Name) -> - Delete = "/opt/kafka/bin/kafka-topics.sh --zookeeper localhost " + Delete = "/opt/kafka/bin/kafka-topics.sh " ++ maybe_zookeeper() ++ " --delete --topic ~s", exec_in_kafka_container(Delete, [Name]). create_topic(Name, NumPartitions, NumReplicas) -> - Create = "/opt/kafka/bin/kafka-topics.sh --zookeeper localhost " + Create = "/opt/kafka/bin/kafka-topics.sh " ++ maybe_zookeeper() ++ " --create --partitions ~p --replication-factor ~p" " --topic ~s --config min.insync.replicas=1", exec_in_kafka_container(Create, [NumPartitions, NumReplicas, Name]).