From fb19d0ee012571ab1f71fa85f6961f2a6301a5ea Mon Sep 17 00:00:00 2001 From: "Ben Sheldon [he/him]" Date: Wed, 24 May 2023 17:36:23 -0700 Subject: [PATCH] Add `TimerTask.new(interval_type:)` option to configure interval calculation Can be either `:fixed_delay` or `:fixed_rate`, default to `:fixed_delay` --- lib/concurrent-ruby/concurrent/timer_task.rb | 54 +++++++++++++- spec/concurrent/timer_task_spec.rb | 74 +++++++++++++++++++- 2 files changed, 124 insertions(+), 4 deletions(-) diff --git a/lib/concurrent-ruby/concurrent/timer_task.rb b/lib/concurrent-ruby/concurrent/timer_task.rb index 69dff3e01..dd2037f62 100644 --- a/lib/concurrent-ruby/concurrent/timer_task.rb +++ b/lib/concurrent-ruby/concurrent/timer_task.rb @@ -32,6 +32,17 @@ module Concurrent # be tested separately then passed to the `TimerTask` for scheduling and # running. # + # A `TimerTask` supports two different types of interval calculations. + # A fixed delay will always wait the same amount of time between the + # completion of one task and the start of the next. A fixed rate will + # attempt to maintain a constant rate of execution regardless of the + # duration of the task. For example, if a fixed rate task is scheduled + # to run every 60 seconds but the task itself takes 10 seconds to + # complete, the next task will be scheduled to run 50 seconds after + # the start of the previous task. If the task takes 70 seconds to + # complete, the next task will be start immediately after the previous + # task completes. Tasks will not be executed concurrently. + # # In some cases it may be necessary for a `TimerTask` to affect its own # execution cycle. To facilitate this, a reference to the TimerTask instance # is passed as an argument to the provided block every time the task is @@ -74,6 +85,12 @@ module Concurrent # # #=> 'Boom!' # + # @example Configuring `:interval_type` with either :fixed_delay or :fixed_rate, default is :fixed_delay + # task = Concurrent::TimerTask.new(execution_interval: 5, interval_type: :fixed_rate) do + # puts 'Boom!' + # end + # task.interval_type #=> :fixed_rate + # # @example Last `#value` and `Dereferenceable` mixin # task = Concurrent::TimerTask.new( # dup_on_deref: true, @@ -152,8 +169,16 @@ class TimerTask < RubyExecutorService # Default `:execution_interval` in seconds. EXECUTION_INTERVAL = 60 - # Default `:timeout_interval` in seconds. - TIMEOUT_INTERVAL = 30 + # Maintain the interval between the end of one execution and the start of the next execution. + FIXED_DELAY = :fixed_delay + + # Maintain the interval between the start of one execution and the start of the next. + # If execution time exceeds the interval, the next execution will start immediately + # after the previous execution finishes. Executions will not run concurrently. + FIXED_RATE = :fixed_rate + + # Default `:interval_type` + DEFAULT_INTERVAL_TYPE = FIXED_DELAY # Create a new TimerTask with the given task and configuration. # @@ -164,6 +189,9 @@ class TimerTask < RubyExecutorService # @option opts [Boolean] :run_now Whether to run the task immediately # upon instantiation or to wait until the first # execution_interval # has passed (default: false) + # @options opts [Symbol] :interval_type method to calculate the interval + # between executions, can be either :fixed_rate or :fixed_delay. + # (default: :fixed_delay) # @option opts [Executor] executor, default is `global_io_executor` # # @!macro deref_options @@ -243,6 +271,10 @@ def execution_interval=(value) end end + # @!attribute [r] interval_type + # @return [Symbol] method to calculate the interval between executions + attr_reader :interval_type + # @!attribute [rw] timeout_interval # @return [Fixnum] Number of seconds the task can run before it is # considered to have failed. @@ -265,10 +297,15 @@ def ns_initialize(opts, &task) set_deref_options(opts) self.execution_interval = opts[:execution] || opts[:execution_interval] || EXECUTION_INTERVAL + if opts[:interval_type] && ![FIXED_DELAY, FIXED_RATE].include?(opts[:interval_type]) + raise ArgumentError.new('interval_type must be either :fixed_delay or :fixed_rate') + end if opts[:timeout] || opts[:timeout_interval] warn 'TimeTask timeouts are now ignored as these were not able to be implemented correctly' end + @run_now = opts[:now] || opts[:run_now] + @interval_type = opts[:interval_type] || DEFAULT_INTERVAL_TYPE @task = Concurrent::SafeTaskExecutor.new(task) @executor = opts[:executor] || Concurrent.global_io_executor @running = Concurrent::AtomicBoolean.new(false) @@ -298,10 +335,11 @@ def schedule_next_task(interval = execution_interval) # @!visibility private def execute_task(completion) return nil unless @running.true? + start_time = Concurrent.monotonic_time _success, value, reason = @task.execute(self) if completion.try? self.value = value - schedule_next_task + schedule_next_task(calculate_next_interval(start_time)) time = Time.now observers.notify_observers do [time, self.value, reason] @@ -309,5 +347,15 @@ def execute_task(completion) end nil end + + # @!visibility private + def calculate_next_interval(start_time) + if @interval_type == FIXED_RATE + run_time = Concurrent.monotonic_time - start_time + [execution_interval - run_time, 0].max + else # FIXED_DELAY + execution_interval + end + end end end diff --git a/spec/concurrent/timer_task_spec.rb b/spec/concurrent/timer_task_spec.rb index 0cb61cc3d..10fbf34db 100644 --- a/spec/concurrent/timer_task_spec.rb +++ b/spec/concurrent/timer_task_spec.rb @@ -83,6 +83,21 @@ def trigger_observable(observable) expect(subject.execution_interval).to eq 5 end + it 'raises an exception if :interval_type is not a valid value' do + expect { + Concurrent::TimerTask.new(interval_type: :cat) { nil } + }.to raise_error(ArgumentError) + end + + it 'uses the default :interval_type when no type is given' do + subject = TimerTask.new { nil } + expect(subject.interval_type).to eq TimerTask::FIXED_DELAY + end + + it 'uses the given interval type' do + subject = TimerTask.new(interval_type: TimerTask::FIXED_RATE) { nil } + expect(subject.interval_type).to eq TimerTask::FIXED_RATE + end end context '#kill' do @@ -113,7 +128,6 @@ def trigger_observable(observable) end specify '#execution_interval is writeable' do - latch = CountDownLatch.new(1) subject = TimerTask.new(timeout_interval: 1, execution_interval: 1, @@ -133,6 +147,28 @@ def trigger_observable(observable) subject.kill end + it 'raises on invalid interval_type' do + expect { + fixed_delay = TimerTask.new(interval_type: TimerTask::FIXED_DELAY, + execution_interval: 0.1, + run_now: true) { nil } + fixed_delay.kill + }.not_to raise_error + + expect { + fixed_rate = TimerTask.new(interval_type: TimerTask::FIXED_RATE, + execution_interval: 0.1, + run_now: true) { nil } + fixed_rate.kill + }.not_to raise_error + + expect { + TimerTask.new(interval_type: :unknown, + execution_interval: 0.1, + run_now: true) { nil } + }.to raise_error(ArgumentError) + end + specify '#timeout_interval being written produces a warning' do subject = TimerTask.new(timeout_interval: 1, execution_interval: 0.1, @@ -209,6 +245,42 @@ def trigger_observable(observable) expect(executor).to have_received(:post) end + + it 'uses a fixed delay when set' do + finished = [] + latch = CountDownLatch.new(2) + subject = TimerTask.new(interval_type: TimerTask::FIXED_DELAY, + execution_interval: 0.1, + run_now: true) do |task| + sleep(0.2) + finished << Concurrent.monotonic_time + latch.count_down + end + subject.execute + latch.wait(1) + subject.kill + + expect(latch.count).to eq(0) + expect(finished[1] - finished[0]).to be >= 0.3 + end + + it 'uses a fixed rate when set' do + finished = [] + latch = CountDownLatch.new(2) + subject = TimerTask.new(interval_type: TimerTask::FIXED_RATE, + execution_interval: 0.1, + run_now: true) do |task| + sleep(0.2) + finished << Concurrent.monotonic_time + latch.count_down + end + subject.execute + latch.wait(1) + subject.kill + + expect(latch.count).to eq(0) + expect(finished[1] - finished[0]).to be < 0.3 + end end context 'observation' do