forked from ahmayun/cs5614-assignment4
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathstart-kafka.sh
149 lines (123 loc) · 4.86 KB
/
start-kafka.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#!/bin/bash -e
# Allow specific kafka versions to perform any unique bootstrap operations
OVERRIDE_FILE="/opt/overrides/${KAFKA_VERSION}.sh"
if [[ -x "$OVERRIDE_FILE" ]]; then
echo "Executing override file $OVERRIDE_FILE"
eval "$OVERRIDE_FILE"
fi
# Store original IFS config, so we can restore it at various stages
ORIG_IFS=$IFS
if [[ -z "$KAFKA_ZOOKEEPER_CONNECT" ]]; then
echo "ERROR: missing mandatory config: KAFKA_ZOOKEEPER_CONNECT"
exit 1
fi
if [[ -z "$KAFKA_PORT" ]]; then
export KAFKA_PORT=9092
fi
create-topics.sh &
unset KAFKA_CREATE_TOPICS
if [[ -z "$KAFKA_ADVERTISED_PORT" && \
-z "$KAFKA_LISTENERS" && \
-z "$KAFKA_ADVERTISED_LISTENERS" && \
-S /var/run/docker.sock ]]; then
KAFKA_ADVERTISED_PORT=$(docker port "$(hostname)" $KAFKA_PORT | sed -r 's/.*:(.*)/\1/g' | head -n1)
export KAFKA_ADVERTISED_PORT
fi
if [[ -z "$KAFKA_BROKER_ID" ]]; then
if [[ -n "$BROKER_ID_COMMAND" ]]; then
KAFKA_BROKER_ID=$(eval "$BROKER_ID_COMMAND")
export KAFKA_BROKER_ID
else
# By default auto allocate broker ID
export KAFKA_BROKER_ID=-1
fi
fi
if [[ -z "$KAFKA_LOG_DIRS" ]]; then
export KAFKA_LOG_DIRS="/kafka/kafka-logs-$HOSTNAME"
fi
if [[ -n "$KAFKA_HEAP_OPTS" ]]; then
sed -r -i 's/(export KAFKA_HEAP_OPTS)="(.*)"/\1="'"$KAFKA_HEAP_OPTS"'"/g' "$KAFKA_HOME/bin/kafka-server-start.sh"
unset KAFKA_HEAP_OPTS
fi
if [[ -n "$HOSTNAME_COMMAND" ]]; then
HOSTNAME_VALUE=$(eval "$HOSTNAME_COMMAND")
# Replace any occurrences of _{HOSTNAME_COMMAND} with the value
IFS=$'\n'
for VAR in $(env); do
if [[ $VAR =~ ^KAFKA_ && "$VAR" =~ "_{HOSTNAME_COMMAND}" ]]; then
eval "export ${VAR//_\{HOSTNAME_COMMAND\}/$HOSTNAME_VALUE}"
fi
done
IFS=$ORIG_IFS
fi
if [[ -n "$PORT_COMMAND" ]]; then
PORT_VALUE=$(eval "$PORT_COMMAND")
# Replace any occurrences of _{PORT_COMMAND} with the value
IFS=$'\n'
for VAR in $(env); do
if [[ $VAR =~ ^KAFKA_ && "$VAR" =~ "_{PORT_COMMAND}" ]]; then
eval "export ${VAR//_\{PORT_COMMAND\}/$PORT_VALUE}"
fi
done
IFS=$ORIG_IFS
fi
if [[ -n "$RACK_COMMAND" && -z "$KAFKA_BROKER_RACK" ]]; then
KAFKA_BROKER_RACK=$(eval "$RACK_COMMAND")
export KAFKA_BROKER_RACK
fi
# Try and configure minimal settings or exit with error if there isn't enough information
if [[ -z "$KAFKA_ADVERTISED_HOST_NAME$KAFKA_LISTENERS" ]]; then
if [[ -n "$KAFKA_ADVERTISED_LISTENERS" ]]; then
echo "ERROR: Missing environment variable KAFKA_LISTENERS. Must be specified when using KAFKA_ADVERTISED_LISTENERS"
exit 1
elif [[ -z "$HOSTNAME_VALUE" ]]; then
echo "ERROR: No listener or advertised hostname configuration provided in environment."
echo " Please define KAFKA_LISTENERS / (deprecated) KAFKA_ADVERTISED_HOST_NAME"
exit 1
fi
# Maintain existing behaviour
# If HOSTNAME_COMMAND is provided, set that to the advertised.host.name value if listeners are not defined.
export KAFKA_ADVERTISED_HOST_NAME="$HOSTNAME_VALUE"
fi
#Issue newline to config file in case there is not one already
echo "" >> "$KAFKA_HOME/config/server.properties"
(
function updateConfig() {
key=$1
value=$2
file=$3
# Omit $value here, in case there is sensitive information
echo "[Configuring] '$key' in '$file'"
# If config exists in file, replace it. Otherwise, append to file.
if grep -E -q "^#?$key=" "$file"; then
sed -r -i "s@^#?$key=.*@$key=$value@g" "$file" #note that no config values may contain an '@' char
else
echo "$key=$value" >> "$file"
fi
}
# Fixes #312
# KAFKA_VERSION + KAFKA_HOME + grep -rohe KAFKA[A-Z0-0_]* /opt/kafka/bin | sort | uniq | tr '\n' '|'
EXCLUSIONS="|KAFKA_VERSION|KAFKA_HOME|KAFKA_DEBUG|KAFKA_GC_LOG_OPTS|KAFKA_HEAP_OPTS|KAFKA_JMX_OPTS|KAFKA_JVM_PERFORMANCE_OPTS|KAFKA_LOG|KAFKA_OPTS|"
# Read in env as a new-line separated array. This handles the case of env variables have spaces and/or carriage returns. See #313
IFS=$'\n'
for VAR in $(env)
do
env_var=$(echo "$VAR" | cut -d= -f1)
if [[ "$EXCLUSIONS" = *"|$env_var|"* ]]; then
echo "Excluding $env_var from broker config"
continue
fi
if [[ $env_var =~ ^KAFKA_ ]]; then
kafka_name=$(echo "$env_var" | cut -d_ -f2- | tr '[:upper:]' '[:lower:]' | tr _ .)
updateConfig "$kafka_name" "${!env_var}" "$KAFKA_HOME/config/server.properties"
fi
if [[ $env_var =~ ^LOG4J_ ]]; then
log4j_name=$(echo "$env_var" | tr '[:upper:]' '[:lower:]' | tr _ .)
updateConfig "$log4j_name" "${!env_var}" "$KAFKA_HOME/config/log4j.properties"
fi
done
)
if [[ -n "$CUSTOM_INIT_SCRIPT" ]] ; then
eval "$CUSTOM_INIT_SCRIPT"
fi
exec "$KAFKA_HOME/bin/kafka-server-start.sh" "$KAFKA_HOME/config/server.properties"