Skip to content

Commit

Permalink
The elasticsearch ruby client isn't thread safe
Browse files Browse the repository at this point in the history
Since the ES Ruby client isn't thread safe, we are currently using
Thread local to create a client per workers to make the connection pool
thread safe.

closes: logstash-plugins#76
  • Loading branch information
ph committed Sep 15, 2017
1 parent 8cb54e3 commit e40c079
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 9 deletions.
31 changes: 23 additions & 8 deletions lib/logstash/filters/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require "logstash/namespace"
require_relative "elasticsearch/client"
require "logstash/json"
java_import "java.util.concurrent.ConcurrentHashMap"

# .Compatibility Note
# [NOTE]
Expand Down Expand Up @@ -125,14 +126,10 @@ class LogStash::Filters::Elasticsearch < LogStash::Filters::Base
# Tags the event on failure to look up geo information. This can be used in later analysis.
config :tag_on_failure, :validate => :array, :default => ["_elasticsearch_lookup_failure"]

attr_reader :clients_pool

def register
options = {
:ssl => @ssl,
:hosts => @hosts,
:ca_file => @ca_file,
:logger => @logger
}
@client = LogStash::Filters::ElasticsearchClient.new(@user, @password, options)
@clients_pool = java.util.concurrent.ConcurrentHashMap.new

#Load query if it exists
if @query_template
Expand Down Expand Up @@ -162,7 +159,7 @@ def filter(event)

@logger.debug("Querying elasticsearch for lookup", :params => params)

results = @client.search(params)
results = get_client.search(params)
@fields.each do |old_key, new_key|
if !results['hits']['hits'].empty?
set = []
Expand All @@ -178,4 +175,22 @@ def filter(event)
end
filter_matched(event)
end # def filter

private
def client_options
{
:ssl => @ssl,
:hosts => @hosts,
:ca_file => @ca_file,
:logger => @logger
}
end

def new_client
LogStash::Filters::ElasticsearchClient.new(@user, @password, client_options)
end

def get_client
@clients_pool.computeIfAbsent(Thread.current, ->(k) { new_client })
end
end #class LogStash::Filters::Elasticsearch
2 changes: 1 addition & 1 deletion lib/logstash/filters/elasticsearch/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def initialize(user, password, options={})
# set ca_file even if ssl isn't on, since the host can be an https url
transport_options[:ssl] = { ca_file: options[:ca_file] } if options[:ca_file]

@logger.info("New ElasticSearch filter", :hosts => hosts)
@logger.info("New ElasticSearch filter client", :hosts => hosts)
@client = ::Elasticsearch::Client.new(hosts: hosts, transport_options: transport_options)
end

Expand Down
19 changes: 19 additions & 0 deletions spec/filters/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,25 @@
plugin.register
end

after(:each) do
Thread.current[:filter_elasticsearch_client] = nil
end

# Since the Elasticsearch Ruby client is not thread safe
# and under high load we can get error with the connection pool
# we have decided to create a new instance per worker thread which
# will be lazy created on the first call to `#filter`
#
# I am adding a simple test case for future changes
it "uses a different connection object per thread wait" do
expect(plugin.clients_pool.size).to eq(0)

Thread.new { plugin.filter(event) }.join
Thread.new { plugin.filter(event) }.join

expect(plugin.clients_pool.size).to eq(2)
end

it "should enhance the current event with new data" do
plugin.filter(event)
expect(event.get("code")).to eq(404)
Expand Down

0 comments on commit e40c079

Please sign in to comment.