diff --git a/CHANGELOG.md b/CHANGELOG.md index 24057a2..c6b59ce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 5.2.3 + - Performance: avoid contention on scheduler execution [#103](https://github.com/logstash-plugins/logstash-integration-jdbc/pull/103) + ## 5.2.2 - Feat: name scheduler threads + redirect error logging [#102](https://github.com/logstash-plugins/logstash-integration-jdbc/pull/102) diff --git a/lib/logstash/inputs/jdbc.rb b/lib/logstash/inputs/jdbc.rb index 2b598e8..3ad54af 100755 --- a/lib/logstash/inputs/jdbc.rb +++ b/lib/logstash/inputs/jdbc.rb @@ -296,7 +296,13 @@ def run(queue) if @schedule # input thread (Java) name example "[my-oracle] 1, :thread_name => "[#{id}] 1, + :thread_name => "[#{id}] 1.0, ) @scheduler.schedule_cron @schedule do execute_query(queue) diff --git a/lib/logstash/plugin_mixins/jdbc/scheduler.rb b/lib/logstash/plugin_mixins/jdbc/scheduler.rb index 489616a..ac41cb4 100644 --- a/lib/logstash/plugin_mixins/jdbc/scheduler.rb +++ b/lib/logstash/plugin_mixins/jdbc/scheduler.rb @@ -12,6 +12,42 @@ class Scheduler < Rufus::Scheduler TimeImpl = defined?(Rufus::Scheduler::EoTime) ? Rufus::Scheduler::EoTime : (defined?(Rufus::Scheduler::ZoTime) ? Rufus::Scheduler::ZoTime : ::Time) + # @overload + def timeout_jobs + # Rufus relies on `Thread.list` which is a blocking operation and with many schedulers + # (and threads) within LS will have a negative impact on performance as scheduler + # threads will end up waiting to obtain the `Thread.list` lock. + # + # However, this isn't necessary we can easily detect whether there are any jobs + # that might need to timeout: only when `@opts[:timeout]` is set causes worker thread(s) + # to have a `Thread.current[:rufus_scheduler_timeout]` that is not nil + return unless @opts[:timeout] + super + end + + # @overload + def work_threads(query = :all) + if query == :__all_no_cache__ # special case from JobDecorator#start_work_thread + @_work_threads = nil # when a new worker thread is being added reset + return super(:all) + end + + # Gets executed every time a job is triggered, we're going to cache the + # worker threads for this scheduler (to avoid `Thread.list`) - they only + # change when a new thread is being started from #start_work_thread ... + work_threads = @_work_threads + if work_threads.nil? + work_threads = threads.select { |t| t[:rufus_scheduler_work_thread] } + @_work_threads = work_threads + end + + case query + when :active then work_threads.select { |t| t[:rufus_scheduler_job] } + when :vacant then work_threads.reject { |t| t[:rufus_scheduler_job] } + else work_threads + end + end + # @overload def on_error(job, err) details = { exception: err.class, message: err.message, backtrace: err.backtrace } @@ -76,10 +112,10 @@ def start_work_thread ret = super() # does not return Thread instance in 3.0 - work_threads = @scheduler.work_threads + work_threads = @scheduler.work_threads(:__all_no_cache__) while prev_thread_count == work_threads.size # very unlikely Thread.pass - work_threads = @scheduler.work_threads + work_threads = @scheduler.work_threads(:__all_no_cache__) end work_thread_name_prefix = @scheduler.work_thread_name_prefix diff --git a/logstash-integration-jdbc.gemspec b/logstash-integration-jdbc.gemspec index 70d8b56..372c791 100755 --- a/logstash-integration-jdbc.gemspec +++ b/logstash-integration-jdbc.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-integration-jdbc' - s.version = '5.2.2' + s.version = '5.2.3' s.licenses = ['Apache License (2.0)'] s.summary = "Integration with JDBC - input and filter plugins" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" diff --git a/spec/plugin_mixins/jdbc/scheduler_spec.rb b/spec/plugin_mixins/jdbc/scheduler_spec.rb index 62b115d..f264a12 100644 --- a/spec/plugin_mixins/jdbc/scheduler_spec.rb +++ b/spec/plugin_mixins/jdbc/scheduler_spec.rb @@ -49,4 +49,30 @@ end + context 'work threads' do + + let(:opts) { super().merge :max_work_threads => 3 } + + let(:counter) { java.util.concurrent.atomic.AtomicLong.new(0) } + + before do + scheduler.schedule_cron('* * * * * *') { counter.increment_and_get; sleep 3.25 } # every second + end + + it "are working" do + sleep(0.05) while counter.get == 0 + expect( scheduler.work_threads.size ).to eql 1 + sleep(0.05) while counter.get == 1 + expect( scheduler.work_threads.size ).to eql 2 + sleep(0.05) while counter.get == 2 + expect( scheduler.work_threads.size ).to eql 3 + + sleep 1.25 + expect( scheduler.work_threads.size ).to eql 3 + sleep 1.25 + expect( scheduler.work_threads.size ).to eql 3 + end + + end + end \ No newline at end of file