Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added reconnect_backoff_max_ms option to set maximum reconnection time... #308

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 32 additions & 25 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,24 +1,30 @@
## 9.0.2
- Added `reconnect_backoff_max_ms` option to set maximum reconnection time to exponential reconnection backoff.
[#308](https://github.com/logstash-plugins/logstash-input-kafka/pull/308)

## 9.0.0
- Removed obsolete `ssl` option

## 8.3.1
- Added support for kafka property ssl.endpoint.identification.algorithm #302(https://github.com/logstash-plugins/logstash-input-kafka/pull/302)
- Added support for kafka property ssl.endpoint.identification.algorithm
[#302](https://github.com/logstash-plugins/logstash-input-kafka/pull/302)

## 8.3.0
- Changed Kafka client version to 2.1.0

## 8.2.1
- Changed Kafka client version to 2.0.1 [#295](https://github.com/logstash-plugins/logstash-input-kafka/pull/295)
- Changed Kafka client version to 2.0.1
[#295](https://github.com/logstash-plugins/logstash-input-kafka/pull/295)

## 8.2.0
- Upgrade Kafka client to version 2.0.0

## 8.1.2
- Docs: Correct list formatting for `decorate_events`
- Docs: Add kafka default to `partition_assignment_strategy`
- Docs: Correct list formatting for `decorate_events`
- Docs: Add kafka default to `partition_assignment_strategy`

## 8.1.1
- Fix race-condition where shutting down a Kafka Input before it has finished starting could cause Logstash to crash
- Fix race-condition where shutting down a Kafka Input before it has finished starting could cause Logstash to crash

## 8.1.0
- Internal: Update build to gradle
Expand Down Expand Up @@ -115,35 +121,36 @@

## 4.0.0
- Republish all the gems under jruby.
- Update the plugin to the version 2.0 of the plugin api, this change is required for Logstash 5.0 compatibility. See https://github.com/elastic/logstash/issues/5141
- Update the plugin to the version 2.0 of the plugin api, this change is required for Logstash 5.0 compatibility.
See https://github.com/elastic/logstash/issues/5141
- Support for Kafka 0.9 for LS 5.x

## 3.0.0.beta7
- Fix Log4j warnings by setting up the logger
- Fix Log4j warnings by setting up the logger

## 3.0.0.beta5 and 3.0.0.beta6
- Internal: Use jar dependency
- Fixed issue with snappy compression
- Internal: Use jar dependency
- Fixed issue with snappy compression

## 3.0.0.beta3 and 3.0.0.beta4
- Internal: Update gemspec dependency
- Internal: Update gemspec dependency

## 3.0.0.beta2
- internal: Use jar dependencies library instead of manually downloading jars
- Fixes "java.lang.ClassNotFoundException: org.xerial.snappy.SnappyOutputStream" issue (#50)
- internal: Use jar dependencies library instead of manually downloading jars
- Fixes "java.lang.ClassNotFoundException: org.xerial.snappy.SnappyOutputStream" issue (#50)

## 3.0.0.beta2
- Added SSL/TLS connection support to Kafka
- Breaking: Changed default codec to plain instead of SSL. Json codec is really slow when used
with inputs because inputs by default are single threaded. This makes it a bad
first user experience. Plain codec is a much better default.
- Added SSL/TLS connection support to Kafka
- Breaking: Changed default codec to plain instead of SSL. Json codec is really slow when used
with inputs because inputs by default are single threaded. This makes it a bad
first user experience. Plain codec is a much better default.

## 3.0.0.beta1
- Refactor to use new Java based consumer, bypassing jruby-kafka
- Breaking: Change configuration to match Kafka's configuration. This version is not backward compatible
- Refactor to use new Java based consumer, bypassing jruby-kafka
- Breaking: Change configuration to match Kafka's configuration. This version is not backward compatible

## 2.0.7
- Update to jruby-kafka 1.6 which includes Kafka 0.8.2.2 enabling LZ4 decompression.
- Update to jruby-kafka 1.6 which includes Kafka 0.8.2.2 enabling LZ4 decompression.

## 2.0.6
- Depend on logstash-core-plugin-api instead of logstash-core, removing the need to mass update plugins on major releases of logstash
Expand All @@ -152,13 +159,13 @@
- New dependency requirements for logstash-core for the 5.0 release

## 2.0.4
- Fix safe shutdown while plugin waits on Kafka for new events
- Expose auto_commit_interval_ms to control offset commit frequency
- Fix safe shutdown while plugin waits on Kafka for new events
- Expose auto_commit_interval_ms to control offset commit frequency

## 2.0.3
- Fix infinite loop when no new messages are found in Kafka
- Fix infinite loop when no new messages are found in Kafka

## 2.0.0
- Plugins were updated to follow the new shutdown semantic, this mainly allows Logstash to instruct input plugins to terminate gracefully,
instead of using Thread.raise on the plugins' threads. Ref: https://github.com/elastic/logstash/pull/3895
- Dependency on logstash-core update to 2.0
- Plugins were updated to follow the new shutdown semantic, this mainly allows Logstash to instruct input plugins to terminate gracefully,
instead of using Thread.raise on the plugins' threads. Ref: https://github.com/elastic/logstash/pull/3895
- Dependency on logstash-core update to 2.0
20 changes: 16 additions & 4 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ https://kafka.apache.org/documentation for more details.
| <<plugins-{type}s-{plugin}-partition_assignment_strategy>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-poll_timeout_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-receive_buffer_bytes>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-reconnect_backoff_ms>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-reconnect_backoff_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-reconnect_backoff_max_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-request_timeout_ms>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-retry_backoff_ms>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-sasl_kerberos_service_name>> |<<string,string>>|No
Expand Down Expand Up @@ -394,12 +395,23 @@ The size of the TCP receive buffer (SO_RCVBUF) to use when reading data.
[id="plugins-{type}s-{plugin}-reconnect_backoff_ms"]
===== `reconnect_backoff_ms`

* Value type is <<string,string>>
* Value type is <<number,number>>
* There is no default value for this setting.

The amount of time to wait before attempting to reconnect to a given host.
The base amount of time to wait before attempting to reconnect to a given host.
This avoids repeatedly connecting to a host in a tight loop.
This backoff applies to all requests sent by the consumer to the broker.
This backoff applies to all connection attempts by the client to a broker.

[id="plugins-{type}s-{plugin}-reconnect_backoff_max_ms"]
===== `reconnect_backoff_max_ms`

* Value type is <<number,number>>
* There is no default value for this setting.

The maximum amount of time to wait when reconnecting to a broker that has repeatedly failed to connect.
If provided, the backoff per host will increase exponentially for each consecutive connection failure,
up to this maximum.
After calculating the backoff increase, 20% random jitter is added to avoid connection storms.

[id="plugins-{type}s-{plugin}-request_timeout_ms"]
===== `request_timeout_ms`
Expand Down
12 changes: 9 additions & 3 deletions lib/logstash/inputs/kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,15 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base
config :partition_assignment_strategy, :validate => :string
# The size of the TCP receive buffer (SO_RCVBUF) to use when reading data.
config :receive_buffer_bytes, :validate => :string
# The amount of time to wait before attempting to reconnect to a given host.
# The base amount of time to wait before attempting to reconnect to a given host.
# This avoids repeatedly connecting to a host in a tight loop.
# This backoff applies to all requests sent by the consumer to the broker.
config :reconnect_backoff_ms, :validate => :string
# This backoff applies to all connection attempts by the client to a broker.
config :reconnect_backoff_ms, :validate => :number
# The maximum amount of time to wait when reconnecting to a broker that has repeatedly failed to connect.
# If provided, the backoff per host will increase exponentially for each consecutive connection failure,
# up to this maximum.
# After calculating the backoff increase, 20% random jitter is added to avoid connection storms.
config :reconnect_backoff_max_ms, :validate => :number
# The configuration controls the maximum amount of time the client will wait
# for the response of a request. If the response is not received before the timeout
# elapses the client will resend the request if necessary or fail the request if
Expand Down Expand Up @@ -306,6 +311,7 @@ def create_consumer(client_id)
props.put(kafka::PARTITION_ASSIGNMENT_STRATEGY_CONFIG, partition_assignment_strategy) unless partition_assignment_strategy.nil?
props.put(kafka::RECEIVE_BUFFER_CONFIG, receive_buffer_bytes) unless receive_buffer_bytes.nil?
props.put(kafka::RECONNECT_BACKOFF_MS_CONFIG, reconnect_backoff_ms) unless reconnect_backoff_ms.nil?
props.put(kafka::RECONNECT_BACKOFF_MAX_MS_CONFIG, reconnect_backoff_max_ms) unless reconnect_backoff_max_ms.nil?
props.put(kafka::REQUEST_TIMEOUT_MS_CONFIG, request_timeout_ms) unless request_timeout_ms.nil?
props.put(kafka::RETRY_BACKOFF_MS_CONFIG, retry_backoff_ms) unless retry_backoff_ms.nil?
props.put(kafka::SEND_BUFFER_CONFIG, send_buffer_bytes) unless send_buffer_bytes.nil?
Expand Down
2 changes: 1 addition & 1 deletion logstash-input-kafka.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-input-kafka'
s.version = '9.0.0'
s.version = '9.0.2'
s.licenses = ['Apache-2.0']
s.summary = "Reads events from a Kafka topic"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down