From 3deebe5586c96980c0439f3834464ae35985a280 Mon Sep 17 00:00:00 2001 From: arenard Date: Sun, 24 Feb 2019 18:09:38 +0100 Subject: [PATCH] Added `reconnect_backoff_max_ms` option to set maximum reconnection time to exponential reconnection backoff --- CHANGELOG.md | 57 ++++++++++++++++++++---------------- docs/index.asciidoc | 20 ++++++++++--- lib/logstash/inputs/kafka.rb | 12 ++++++-- logstash-input-kafka.gemspec | 2 +- 4 files changed, 58 insertions(+), 33 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ac971f2..cac0b8a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,24 +1,30 @@ +## 9.0.2 + - Added `reconnect_backoff_max_ms` option to set maximum reconnection time to exponential reconnection backoff. + [#308](https://github.com/logstash-plugins/logstash-input-kafka/pull/308) + ## 9.0.0 - Removed obsolete `ssl` option ## 8.3.1 - - Added support for kafka property ssl.endpoint.identification.algorithm #302(https://github.com/logstash-plugins/logstash-input-kafka/pull/302) + - Added support for kafka property ssl.endpoint.identification.algorithm + [#302](https://github.com/logstash-plugins/logstash-input-kafka/pull/302) ## 8.3.0 - Changed Kafka client version to 2.1.0 ## 8.2.1 - - Changed Kafka client version to 2.0.1 [#295](https://github.com/logstash-plugins/logstash-input-kafka/pull/295) + - Changed Kafka client version to 2.0.1 + [#295](https://github.com/logstash-plugins/logstash-input-kafka/pull/295) ## 8.2.0 - Upgrade Kafka client to version 2.0.0 ## 8.1.2 - - Docs: Correct list formatting for `decorate_events` - - Docs: Add kafka default to `partition_assignment_strategy` + - Docs: Correct list formatting for `decorate_events` + - Docs: Add kafka default to `partition_assignment_strategy` ## 8.1.1 - - Fix race-condition where shutting down a Kafka Input before it has finished starting could cause Logstash to crash + - Fix race-condition where shutting down a Kafka Input before it has finished starting could cause Logstash to crash ## 8.1.0 - Internal: Update build to gradle @@ -115,35 +121,36 @@ ## 4.0.0 - Republish all the gems under jruby. - - Update the plugin to the version 2.0 of the plugin api, this change is required for Logstash 5.0 compatibility. See https://github.com/elastic/logstash/issues/5141 + - Update the plugin to the version 2.0 of the plugin api, this change is required for Logstash 5.0 compatibility. + See https://github.com/elastic/logstash/issues/5141 - Support for Kafka 0.9 for LS 5.x ## 3.0.0.beta7 - - Fix Log4j warnings by setting up the logger + - Fix Log4j warnings by setting up the logger ## 3.0.0.beta5 and 3.0.0.beta6 - - Internal: Use jar dependency - - Fixed issue with snappy compression + - Internal: Use jar dependency + - Fixed issue with snappy compression ## 3.0.0.beta3 and 3.0.0.beta4 - - Internal: Update gemspec dependency + - Internal: Update gemspec dependency ## 3.0.0.beta2 - - internal: Use jar dependencies library instead of manually downloading jars - - Fixes "java.lang.ClassNotFoundException: org.xerial.snappy.SnappyOutputStream" issue (#50) + - internal: Use jar dependencies library instead of manually downloading jars + - Fixes "java.lang.ClassNotFoundException: org.xerial.snappy.SnappyOutputStream" issue (#50) ## 3.0.0.beta2 - - Added SSL/TLS connection support to Kafka - - Breaking: Changed default codec to plain instead of SSL. Json codec is really slow when used - with inputs because inputs by default are single threaded. This makes it a bad - first user experience. Plain codec is a much better default. + - Added SSL/TLS connection support to Kafka + - Breaking: Changed default codec to plain instead of SSL. Json codec is really slow when used + with inputs because inputs by default are single threaded. This makes it a bad + first user experience. Plain codec is a much better default. ## 3.0.0.beta1 - - Refactor to use new Java based consumer, bypassing jruby-kafka - - Breaking: Change configuration to match Kafka's configuration. This version is not backward compatible + - Refactor to use new Java based consumer, bypassing jruby-kafka + - Breaking: Change configuration to match Kafka's configuration. This version is not backward compatible ## 2.0.7 - - Update to jruby-kafka 1.6 which includes Kafka 0.8.2.2 enabling LZ4 decompression. + - Update to jruby-kafka 1.6 which includes Kafka 0.8.2.2 enabling LZ4 decompression. ## 2.0.6 - Depend on logstash-core-plugin-api instead of logstash-core, removing the need to mass update plugins on major releases of logstash @@ -152,13 +159,13 @@ - New dependency requirements for logstash-core for the 5.0 release ## 2.0.4 - - Fix safe shutdown while plugin waits on Kafka for new events - - Expose auto_commit_interval_ms to control offset commit frequency + - Fix safe shutdown while plugin waits on Kafka for new events + - Expose auto_commit_interval_ms to control offset commit frequency ## 2.0.3 - - Fix infinite loop when no new messages are found in Kafka + - Fix infinite loop when no new messages are found in Kafka ## 2.0.0 - - Plugins were updated to follow the new shutdown semantic, this mainly allows Logstash to instruct input plugins to terminate gracefully, - instead of using Thread.raise on the plugins' threads. Ref: https://github.com/elastic/logstash/pull/3895 - - Dependency on logstash-core update to 2.0 + - Plugins were updated to follow the new shutdown semantic, this mainly allows Logstash to instruct input plugins to terminate gracefully, + instead of using Thread.raise on the plugins' threads. Ref: https://github.com/elastic/logstash/pull/3895 + - Dependency on logstash-core update to 2.0 diff --git a/docs/index.asciidoc b/docs/index.asciidoc index be86047..92f8a6c 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -102,7 +102,8 @@ https://kafka.apache.org/documentation for more details. | <> |<>|No | <> |<>|No | <> |<>|No -| <> |<>|No +| <> |<>|No +| <> |<>|No | <> |<>|No | <> |<>|No | <> |<>|No @@ -394,12 +395,23 @@ The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. [id="plugins-{type}s-{plugin}-reconnect_backoff_ms"] ===== `reconnect_backoff_ms` - * Value type is <> + * Value type is <> * There is no default value for this setting. -The amount of time to wait before attempting to reconnect to a given host. +The base amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. -This backoff applies to all requests sent by the consumer to the broker. +This backoff applies to all connection attempts by the client to a broker. + +[id="plugins-{type}s-{plugin}-reconnect_backoff_max_ms"] +===== `reconnect_backoff_max_ms` + + * Value type is <> + * There is no default value for this setting. + +The maximum amount of time to wait when reconnecting to a broker that has repeatedly failed to connect. +If provided, the backoff per host will increase exponentially for each consecutive connection failure, +up to this maximum. +After calculating the backoff increase, 20% random jitter is added to avoid connection storms. [id="plugins-{type}s-{plugin}-request_timeout_ms"] ===== `request_timeout_ms` diff --git a/lib/logstash/inputs/kafka.rb b/lib/logstash/inputs/kafka.rb index 4ce411e..5217372 100644 --- a/lib/logstash/inputs/kafka.rb +++ b/lib/logstash/inputs/kafka.rb @@ -133,10 +133,15 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base config :partition_assignment_strategy, :validate => :string # The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. config :receive_buffer_bytes, :validate => :string - # The amount of time to wait before attempting to reconnect to a given host. + # The base amount of time to wait before attempting to reconnect to a given host. # This avoids repeatedly connecting to a host in a tight loop. - # This backoff applies to all requests sent by the consumer to the broker. - config :reconnect_backoff_ms, :validate => :string + # This backoff applies to all connection attempts by the client to a broker. + config :reconnect_backoff_ms, :validate => :number + # The maximum amount of time to wait when reconnecting to a broker that has repeatedly failed to connect. + # If provided, the backoff per host will increase exponentially for each consecutive connection failure, + # up to this maximum. + # After calculating the backoff increase, 20% random jitter is added to avoid connection storms. + config :reconnect_backoff_max_ms, :validate => :number # The configuration controls the maximum amount of time the client will wait # for the response of a request. If the response is not received before the timeout # elapses the client will resend the request if necessary or fail the request if @@ -306,6 +311,7 @@ def create_consumer(client_id) props.put(kafka::PARTITION_ASSIGNMENT_STRATEGY_CONFIG, partition_assignment_strategy) unless partition_assignment_strategy.nil? props.put(kafka::RECEIVE_BUFFER_CONFIG, receive_buffer_bytes) unless receive_buffer_bytes.nil? props.put(kafka::RECONNECT_BACKOFF_MS_CONFIG, reconnect_backoff_ms) unless reconnect_backoff_ms.nil? + props.put(kafka::RECONNECT_BACKOFF_MAX_MS_CONFIG, reconnect_backoff_max_ms) unless reconnect_backoff_max_ms.nil? props.put(kafka::REQUEST_TIMEOUT_MS_CONFIG, request_timeout_ms) unless request_timeout_ms.nil? props.put(kafka::RETRY_BACKOFF_MS_CONFIG, retry_backoff_ms) unless retry_backoff_ms.nil? props.put(kafka::SEND_BUFFER_CONFIG, send_buffer_bytes) unless send_buffer_bytes.nil? diff --git a/logstash-input-kafka.gemspec b/logstash-input-kafka.gemspec index 8434d4b..4d35bee 100644 --- a/logstash-input-kafka.gemspec +++ b/logstash-input-kafka.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-input-kafka' - s.version = '9.0.0' + s.version = '9.0.2' s.licenses = ['Apache-2.0'] s.summary = "Reads events from a Kafka topic" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"