-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathconfig.sample.toml
140 lines (115 loc) · 3.37 KB
/
config.sample.toml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
[app]
log_level = "debug"
metrics_server_addr = ":7081"
# Map of topics from the source to sync to the target.
# source_topic => target_topic:optional_target_partition
# If the target partition is not specified, whatever partition a message
# was received from the source, the same partition is written to the target.
[[topics]]
source_topic1 = "target_topic1:1"
source_topic2 = "target_topic2"
[source_pool]
# Kafka client config common to all upstream sources ([[sources]]).
initial_offset = "start"
# Frequency at which source servers are polled for health/lag.
healthcheck_interval = "3s"
# Max difference in total offsets across all topics on a source
# against other sources, which when breached, the source is marked
# as unhealthy because of a lag.
offset_lag_threshold = 1000
# Maximum number of connect/fetch retries before exiting. -1 for infinite.
max_retries = -1
# Kafka exponential retry-backoff config for reconnection attempts.
# If both min and max values are the same, then it's a static wait time
# between attempts.
backoff_enable = true
backoff_min = "2s"
backoff_max = "10s"
# Wait timeout of a request/response to a Kafka instance to determine
# whether it's healthy or not.
request_timeout = "100ms"
# Pick a random server from the [[sources]] list to connect first on boot
# instead of the first one from the list. This can be useful for testing
# servers in production environments that may never be consumed from except
# during rare failover events.
randomize_initial = false
[[sources]]
name = "node1"
servers = ["127.0.0.1:9092"]
session_timeout = "6s"
enable_auth = true
sasl_mechanism = "PLAIN"
username = "user-x"
password = "pass-x"
max_wait_time = "10ms"
max_failovers = -1 # infinite
enable_tls = false
client_key_path = ""
client_cert_path = ""
ca_cert_path = ""
enable_log = false
[[sources]]
name = "node2"
servers = ["node2:9092"]
session_timeout = "6s"
enable_auth = true
sasl_mechanism = "PLAIN"
username = "user-x"
password = "pass-x"
max_wait_time = "10ms"
max_failovers = -1 # infinite
enable_tls = false
client_key_path = ""
client_cert_path = ""
ca_cert_path = ""
enable_log = false
# Destination kafka producer configuration
[target]
name = "node3"
servers = ["127.0.0.1:9095"]
enable_log = false
enable_auth = true
sasl_mechanism = "PLAIN" # PLAIN/SCRAM-SHA-256/SCRAM-SHA-512
username = "user-y"
password = "pass-y"
enable_idempotency = true
commit_ack_type = "cluster"
flush_frequency = "20ms"
session_timeout = "6s"
enable_tls = false
client_key_path = ""
client_cert_path = ""
ca_cert_path = ""
# -1 for infinite.
max_retries = -1
flush_batch_size = 1000
batch_size = 1000
buffer_size = 100000 # channel buffer length
max_message_bytes = 10000000
# Kafka exponential retry-backoff config for reconnection attempts.
# If both min and max values are the same, then it's a static wait time
# between attempts.
backoff_enable = true
backoff_min = "2s"
backoff_max = "10s"
# Wait timeout of a request/response to a Kafka instance to determine
# whether it's healthy or not.
request_timeout = "100ms"
# Custom go-plugin filter to load to filter messages when relaying
[filters.test]
enabled = false
path = "test.bin"
config = '''
{
"address": ["127.0.0.1:6379"],
"username": "",
"password": "",
"db": 10,
"max_active": 50,
"max_idle": 20,
"dial_timeout": 3000,
"read_timeout": 3000,
"write_timeout": 3000,
"idle_timeout": 30000
}
'''