Skip to content

Commit

Permalink
refactor: enable access from the host (#445)
Browse files Browse the repository at this point in the history
Signed-off-by: Curtis Wan <[email protected]>
  • Loading branch information
mooc9988 authored Nov 14, 2023
1 parent fecb246 commit 6c6eab9
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 40 deletions.
20 changes: 6 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,24 +41,19 @@ Launch an AutoMQ for Kafka cluster locally using Docker Compose.

This cluster comprises 1 Controller node, 2 Broker nodes, and an additional LocalStack container to simulate S3 services locally.
``` bash
cd docker
# launch AutoMQ for Kafka cluster
docker compose up -d
docker compose -f docker/docker-compose.yaml up -d
```
#### Run a console producer and consumer
1. Create a topic to store your events:
``` bash
# login to broker1
docker exec -it broker1 bash
# go to kafka directory
cd /opt/kafka/kafka
# create quickstart-events topic
bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server $(hostname -I | awk '{print $1}'):9092
bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9094
```

2. Run the console producer client to write a few events into your topic. By default, each line you enter will result in a separate event being written to the topic.
``` bash
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server $(hostname -I | awk '{print $1}'):9092
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9094
```
You may input some messages like:
``` text
Expand All @@ -68,17 +63,14 @@ This is my second event

3. Run the console consumer client to read the events you just created:
``` bash
# CRTL-C to exit the producer
# CRTL-C to exit the consumer
# run console consumer
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server $(hostname -I | awk '{print $1}'):9092
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9094
```

4. Clean up the cluster
``` bash
# exit from the broker1 container
exit
# clean up the cluster
docker compose down -v
docker compose -f docker/docker-compose.yaml down -v
```

[Explore more](https://docs.automq.com/zh/docs/automq-s3kafka/VKpxwOPvciZmjGkHk5hcTz43nde): Second-level partition migration and automatic traffic rebalancing.
Expand Down
3 changes: 2 additions & 1 deletion config/kraft/broker.properties
Original file line number Diff line number Diff line change
Expand Up @@ -197,4 +197,5 @@ autobalancer.reporter.network.in.capacity=102400
autobalancer.reporter.network.out.capacity=102400

# The reporter interval of Auto Balancer metric reporter, default 10s
autobalancer.reporter.metrics.reporting.interval.ms=10000
autobalancer.reporter.metrics.reporting.interval.ms=10000

46 changes: 31 additions & 15 deletions docker/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ services:
- /var/run/docker.sock:/var/run/docker.sock
# use a static ip
networks:
kos_net:
afk_net:
ipv4_address: 10.6.0.2

# create needed buckets
Expand All @@ -47,36 +47,43 @@ services:
localstack:
condition: service_healthy
networks:
- kos_net
- afk_net

controller:
container_name: "${CONTROLLER_DOCKER_NAME-controller}"
hostname: "${CONTROLLER_DOCKER_NAME-controller}"
image: automqinc/kafka:0.6.6
image: automqinc/kafka:0.7.0
environment:
- KAFKA_S3_ACCESS_KEY=test
- KAFKA_S3_SECRET_KEY=test
- KAFKA_HEAP_OPTS=-Xms1g -Xmx1g -XX:MetaspaceSize=96m
command: bash /opt/kafka/scripts/start.sh up --process.roles controller --node.id 0 --controller.quorum.voters [email protected]:9093 --s3.bucket ko3 --s3.endpoint http://10.6.0.2:4566 --s3.region us-east-1
# use a static ip
command: bash /opt/kafka/scripts/start.sh up --process.roles controller --node.id 0 --controller.quorum.voters 0@controller:9093 --s3.bucket ko3 --s3.endpoint http://10.6.0.2:4566 --s3.region us-east-1
networks:
kos_net:
ipv4_address: 10.6.0.4
- afk_net
depends_on:
- localstack
- aws-cli

broker1:
container_name: "${BROKER1_DOCKER_NAME-broker1}"
hostname: "${BROKER1_DOCKER_NAME-broker1}"
image: automqinc/kafka:0.6.6
image: automqinc/kafka:0.7.0
ports:
- "9094:9094"
environment:
- KAFKA_S3_ACCESS_KEY=test
- KAFKA_S3_SECRET_KEY=test
- KAFKA_HEAP_OPTS=-Xms1g -Xmx1g -XX:MetaspaceSize=96m -XX:MaxDirectMemorySize=1G
command: bash /opt/kafka/scripts/start.sh up --process.roles broker --node.id 1 --controller.quorum.voters [email protected]:9093 --s3.bucket ko3 --s3.endpoint http://10.6.0.2:4566 --s3.region us-east-1 --override autobalancer.reporter.network.in.capacity=5120 --override autobalancer.reporter.network.out.capacity=5120 --override autobalancer.reporter.metrics.reporting.interval.ms=5000
- KAFKA_CFG_AUTOBALANCER_REPORTER_NETWORK_IN_CAPACITY=5120
- KAFKA_CFG_AUTOBALANCER_REPORTER_NETWORK_OUT_CAPACITY=5120
- KAFKA_CFG_AUTOBALANCER_REPORTER_METRICS_REPORTING_INTERVAL_MS=5000
# override listener settings
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,EXTERNAL://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://broker1:9092,EXTERNAL://localhost:9094
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
command: bash /opt/kafka/scripts/start.sh up --process.roles broker --node.id 1 --controller.quorum.voters 0@controller:9093 --s3.bucket ko3 --s3.endpoint http://10.6.0.2:4566 --s3.region us-east-1
networks:
- kos_net
- afk_net
depends_on:
- localstack
- aws-cli
Expand All @@ -85,14 +92,23 @@ services:
broker2:
container_name: "${BROKER2_DOCKER_NAME-broker2}"
hostname: "${BROKER2_DOCKER_NAME-broker2}"
image: automqinc/kafka:0.6.6
image: automqinc/kafka:0.7.0
ports:
- "9095:9095"
environment:
- KAFKA_S3_ACCESS_KEY=test
- KAFKA_S3_SECRET_KEY=test
- KAFKA_HEAP_OPTS=-Xms1g -Xmx1g -XX:MetaspaceSize=96m -XX:MaxDirectMemorySize=1G
command: bash /opt/kafka/scripts/start.sh up --process.roles broker --node.id 2 --controller.quorum.voters [email protected]:9093 --s3.bucket ko3 --s3.endpoint http://10.6.0.2:4566 --s3.region us-east-1 --override autobalancer.reporter.network.in.capacity=5120 --override autobalancer.reporter.network.out.capacity=5120 --override autobalancer.reporter.metrics.reporting.interval.ms=5000
- KAFKA_CFG_AUTOBALANCER_REPORTER_NETWORK_IN_CAPACITY=5120
- KAFKA_CFG_AUTOBALANCER_REPORTER_NETWORK_OUT_CAPACITY=5120
- KAFKA_CFG_AUTOBALANCER_REPORTER_METRICS_REPORTING_INTERVAL_MS=5000
# override listener settings
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,EXTERNAL://:9095
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://broker2:9092,EXTERNAL://localhost:9095
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
command: bash /opt/kafka/scripts/start.sh up --process.roles broker --node.id 2 --controller.quorum.voters 0@controller:9093 --s3.bucket ko3 --s3.endpoint http://10.6.0.2:4566 --s3.region us-east-1
networks:
- kos_net
- afk_net
depends_on:
- localstack
- aws-cli
Expand All @@ -104,8 +120,8 @@ volumes:
driver: local

networks:
kos_net:
name: kos_net
afk_net:
name: afk_net
driver: bridge
ipam:
driver: default
Expand Down
38 changes: 28 additions & 10 deletions docker/scripts/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ help|-h|--help
Display this help message
up [--process.roles ROLE] [--node.id NODE_ID] [--controller.quorum.voters VOTERS]
[--s3.region REGION] [--s3.bucket BUCKET] [--s3.endpoint ENDPOINT]
[--s3.access.key ACCESS_KEY] [--s3.secret.key SECRET_KEY] [--override KEY=VALUE]
[--s3.access.key ACCESS_KEY] [--s3.secret.key SECRET_KEY]
start node.
EOF
exit "${exit_status}"
Expand Down Expand Up @@ -189,7 +189,7 @@ add_settings_for_s3() {
add_or_setup_value "s3.stream.object.split.size" "16777216" "${file_name}"
add_or_setup_value "s3.object.block.size" "16777216" "${file_name}"
add_or_setup_value "s3.object.part.size" "33554432" "${file_name}"
add_or_setup_value "s3.cache.size" "1073741824" "${file_name}"
add_or_setup_value "s3.block.cache.size" "1073741824" "${file_name}"
add_or_setup_value "stream.set.object.compaction.cache.size" "536870912" "${file_name}"
fi
}
Expand Down Expand Up @@ -220,9 +220,32 @@ kafka_monitor_ip() {
fi
}

configure_from_environment_variables() {
file_name=$1
# List of special cases to apply to the variables
local -r exception_regexps=(
"s/sasl\.ssl/sasl_ssl/g"
"s/sasl\.plaintext/sasl_plaintext/g"
)
# Map environment variables to config properties
for var in "${!KAFKA_CFG_@}"; do
key="$(echo "$var" | sed -e 's/^KAFKA_CFG_//g' -e 's/_/\./g' | tr '[:upper:]' '[:lower:]')"

# Exception for the camel case in this environment variable
[[ "$var" == "KAFKA_CFG_ZOOKEEPER_CLIENTCNXNSOCKET" ]] && key="zookeeper.clientCnxnSocket"

# Apply exception regexps
for regex in "${exception_regexps[@]}"; do
key="$(echo "$key" | sed "$regex")"
done

value="${!var}"
add_or_setup_value "${key}" "${value}" "${file_name}"
done
}

kafka_up() {
echo "kafka_up: start"
override_settings=()

while [[ $# -ge 1 ]]; do
case "${1}" in
Expand All @@ -235,7 +258,6 @@ kafka_up() {
--s3.access.key) set_once s3_access_key "${2}" "s3 access key"; shift 2;;
--s3.secret.key) set_once s3_secret_key "${2}" "s3 secret key"; shift 2;;
--s3.endpoint) set_once s3_endpoint "${2}" "s3 endpoint"; shift 2;;
--override) [[ -n "${2}" ]] && override_settings+=("${2}"); shift 2;;
esac
done

Expand Down Expand Up @@ -294,12 +316,8 @@ kafka_up() {
kafka_monitor_ip
echo "kafka_up: ip settings changed"

# override settings
for element in "${override_settings[@]}"; do
key=$(echo "${element}" | awk -F= '{print $1}')
value=$(echo "${element}" | awk -F= '{print $2}')
add_or_setup_value "${key}" "${value}" "${kafka_dir}/config/kraft/${process_role}.properties"
done
# override settings from env
configure_from_environment_variables "${kafka_dir}/config/kraft/${process_role}.properties"

# format the data path
must_do -v "${kafka_dir}/bin/kafka-storage.sh format -g -t ${cluster_id} -c ${kafka_dir}/config/kraft/${process_role}.properties"
Expand Down

0 comments on commit 6c6eab9

Please sign in to comment.