Skip to content

Commit

Permalink
Added reconnect_backoff_max_ms option to set maximum reconnection time
Browse files Browse the repository at this point in the history
to exponential reconnection backoff
  • Loading branch information
arenard committed Feb 24, 2019
1 parent 4adfec6 commit 3deebe5
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 33 deletions.
57 changes: 32 additions & 25 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,24 +1,30 @@
## 9.0.2
- Added `reconnect_backoff_max_ms` option to set maximum reconnection time to exponential reconnection backoff.
[#308](https://github.com/logstash-plugins/logstash-input-kafka/pull/308)

## 9.0.0
- Removed obsolete `ssl` option

## 8.3.1
- Added support for kafka property ssl.endpoint.identification.algorithm #302(https://github.com/logstash-plugins/logstash-input-kafka/pull/302)
- Added support for kafka property ssl.endpoint.identification.algorithm
[#302](https://github.com/logstash-plugins/logstash-input-kafka/pull/302)

## 8.3.0
- Changed Kafka client version to 2.1.0

## 8.2.1
- Changed Kafka client version to 2.0.1 [#295](https://github.com/logstash-plugins/logstash-input-kafka/pull/295)
- Changed Kafka client version to 2.0.1
[#295](https://github.com/logstash-plugins/logstash-input-kafka/pull/295)

## 8.2.0
- Upgrade Kafka client to version 2.0.0

## 8.1.2
- Docs: Correct list formatting for `decorate_events`
- Docs: Add kafka default to `partition_assignment_strategy`
- Docs: Correct list formatting for `decorate_events`
- Docs: Add kafka default to `partition_assignment_strategy`

## 8.1.1
- Fix race-condition where shutting down a Kafka Input before it has finished starting could cause Logstash to crash
- Fix race-condition where shutting down a Kafka Input before it has finished starting could cause Logstash to crash

## 8.1.0
- Internal: Update build to gradle
Expand Down Expand Up @@ -115,35 +121,36 @@

## 4.0.0
- Republish all the gems under jruby.
- Update the plugin to the version 2.0 of the plugin api, this change is required for Logstash 5.0 compatibility. See https://github.com/elastic/logstash/issues/5141
- Update the plugin to the version 2.0 of the plugin api, this change is required for Logstash 5.0 compatibility.
See https://github.com/elastic/logstash/issues/5141
- Support for Kafka 0.9 for LS 5.x

## 3.0.0.beta7
- Fix Log4j warnings by setting up the logger
- Fix Log4j warnings by setting up the logger

## 3.0.0.beta5 and 3.0.0.beta6
- Internal: Use jar dependency
- Fixed issue with snappy compression
- Internal: Use jar dependency
- Fixed issue with snappy compression

## 3.0.0.beta3 and 3.0.0.beta4
- Internal: Update gemspec dependency
- Internal: Update gemspec dependency

## 3.0.0.beta2
- internal: Use jar dependencies library instead of manually downloading jars
- Fixes "java.lang.ClassNotFoundException: org.xerial.snappy.SnappyOutputStream" issue (#50)
- internal: Use jar dependencies library instead of manually downloading jars
- Fixes "java.lang.ClassNotFoundException: org.xerial.snappy.SnappyOutputStream" issue (#50)

## 3.0.0.beta2
- Added SSL/TLS connection support to Kafka
- Breaking: Changed default codec to plain instead of SSL. Json codec is really slow when used
with inputs because inputs by default are single threaded. This makes it a bad
first user experience. Plain codec is a much better default.
- Added SSL/TLS connection support to Kafka
- Breaking: Changed default codec to plain instead of SSL. Json codec is really slow when used
with inputs because inputs by default are single threaded. This makes it a bad
first user experience. Plain codec is a much better default.

## 3.0.0.beta1
- Refactor to use new Java based consumer, bypassing jruby-kafka
- Breaking: Change configuration to match Kafka's configuration. This version is not backward compatible
- Refactor to use new Java based consumer, bypassing jruby-kafka
- Breaking: Change configuration to match Kafka's configuration. This version is not backward compatible

## 2.0.7
- Update to jruby-kafka 1.6 which includes Kafka 0.8.2.2 enabling LZ4 decompression.
- Update to jruby-kafka 1.6 which includes Kafka 0.8.2.2 enabling LZ4 decompression.

## 2.0.6
- Depend on logstash-core-plugin-api instead of logstash-core, removing the need to mass update plugins on major releases of logstash
Expand All @@ -152,13 +159,13 @@
- New dependency requirements for logstash-core for the 5.0 release

## 2.0.4
- Fix safe shutdown while plugin waits on Kafka for new events
- Expose auto_commit_interval_ms to control offset commit frequency
- Fix safe shutdown while plugin waits on Kafka for new events
- Expose auto_commit_interval_ms to control offset commit frequency

## 2.0.3
- Fix infinite loop when no new messages are found in Kafka
- Fix infinite loop when no new messages are found in Kafka

## 2.0.0
- Plugins were updated to follow the new shutdown semantic, this mainly allows Logstash to instruct input plugins to terminate gracefully,
instead of using Thread.raise on the plugins' threads. Ref: https://github.com/elastic/logstash/pull/3895
- Dependency on logstash-core update to 2.0
- Plugins were updated to follow the new shutdown semantic, this mainly allows Logstash to instruct input plugins to terminate gracefully,
instead of using Thread.raise on the plugins' threads. Ref: https://github.com/elastic/logstash/pull/3895
- Dependency on logstash-core update to 2.0
20 changes: 16 additions & 4 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ https://kafka.apache.org/documentation for more details.
| <<plugins-{type}s-{plugin}-partition_assignment_strategy>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-poll_timeout_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-receive_buffer_bytes>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-reconnect_backoff_ms>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-reconnect_backoff_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-reconnect_backoff_max_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-request_timeout_ms>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-retry_backoff_ms>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-sasl_kerberos_service_name>> |<<string,string>>|No
Expand Down Expand Up @@ -394,12 +395,23 @@ The size of the TCP receive buffer (SO_RCVBUF) to use when reading data.
[id="plugins-{type}s-{plugin}-reconnect_backoff_ms"]
===== `reconnect_backoff_ms`

* Value type is <<string,string>>
* Value type is <<number,number>>
* There is no default value for this setting.

The amount of time to wait before attempting to reconnect to a given host.
The base amount of time to wait before attempting to reconnect to a given host.
This avoids repeatedly connecting to a host in a tight loop.
This backoff applies to all requests sent by the consumer to the broker.
This backoff applies to all connection attempts by the client to a broker.

[id="plugins-{type}s-{plugin}-reconnect_backoff_max_ms"]
===== `reconnect_backoff_max_ms`

* Value type is <<number,number>>
* There is no default value for this setting.

The maximum amount of time to wait when reconnecting to a broker that has repeatedly failed to connect.
If provided, the backoff per host will increase exponentially for each consecutive connection failure,
up to this maximum.
After calculating the backoff increase, 20% random jitter is added to avoid connection storms.

[id="plugins-{type}s-{plugin}-request_timeout_ms"]
===== `request_timeout_ms`
Expand Down
12 changes: 9 additions & 3 deletions lib/logstash/inputs/kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,15 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base
config :partition_assignment_strategy, :validate => :string
# The size of the TCP receive buffer (SO_RCVBUF) to use when reading data.
config :receive_buffer_bytes, :validate => :string
# The amount of time to wait before attempting to reconnect to a given host.
# The base amount of time to wait before attempting to reconnect to a given host.
# This avoids repeatedly connecting to a host in a tight loop.
# This backoff applies to all requests sent by the consumer to the broker.
config :reconnect_backoff_ms, :validate => :string
# This backoff applies to all connection attempts by the client to a broker.
config :reconnect_backoff_ms, :validate => :number
# The maximum amount of time to wait when reconnecting to a broker that has repeatedly failed to connect.
# If provided, the backoff per host will increase exponentially for each consecutive connection failure,
# up to this maximum.
# After calculating the backoff increase, 20% random jitter is added to avoid connection storms.
config :reconnect_backoff_max_ms, :validate => :number
# The configuration controls the maximum amount of time the client will wait
# for the response of a request. If the response is not received before the timeout
# elapses the client will resend the request if necessary or fail the request if
Expand Down Expand Up @@ -306,6 +311,7 @@ def create_consumer(client_id)
props.put(kafka::PARTITION_ASSIGNMENT_STRATEGY_CONFIG, partition_assignment_strategy) unless partition_assignment_strategy.nil?
props.put(kafka::RECEIVE_BUFFER_CONFIG, receive_buffer_bytes) unless receive_buffer_bytes.nil?
props.put(kafka::RECONNECT_BACKOFF_MS_CONFIG, reconnect_backoff_ms) unless reconnect_backoff_ms.nil?
props.put(kafka::RECONNECT_BACKOFF_MAX_MS_CONFIG, reconnect_backoff_max_ms) unless reconnect_backoff_max_ms.nil?
props.put(kafka::REQUEST_TIMEOUT_MS_CONFIG, request_timeout_ms) unless request_timeout_ms.nil?
props.put(kafka::RETRY_BACKOFF_MS_CONFIG, retry_backoff_ms) unless retry_backoff_ms.nil?
props.put(kafka::SEND_BUFFER_CONFIG, send_buffer_bytes) unless send_buffer_bytes.nil?
Expand Down
2 changes: 1 addition & 1 deletion logstash-input-kafka.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-input-kafka'
s.version = '9.0.0'
s.version = '9.0.2'
s.licenses = ['Apache-2.0']
s.summary = "Reads events from a Kafka topic"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down

0 comments on commit 3deebe5

Please sign in to comment.