Skip to content

Commit

Permalink
Allow TimerTask to be safely restarted after shutdown and avoid dupli…
Browse files Browse the repository at this point in the history
…cate tasks
  • Loading branch information
bensheldon committed Mar 1, 2024
1 parent eae2851 commit af983a1
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 2 deletions.
9 changes: 7 additions & 2 deletions lib/concurrent-ruby/concurrent/timer_task.rb
Expand Up @@ -2,6 +2,7 @@
require 'concurrent/concern/dereferenceable'
require 'concurrent/concern/observable'
require 'concurrent/atomic/atomic_boolean'
require 'concurrent/atomic/atomic_fixnum'
require 'concurrent/executor/executor_service'
require 'concurrent/executor/ruby_executor_service'
require 'concurrent/executor/safe_task_executor'
Expand Down Expand Up @@ -236,6 +237,7 @@ def execute
synchronize do
if @running.false?
@running.make_true
@age.increment
schedule_next_task(@run_now ? 0 : @execution_interval)
end
end
Expand Down Expand Up @@ -309,6 +311,7 @@ def ns_initialize(opts, &task)
@task = Concurrent::SafeTaskExecutor.new(task)
@executor = opts[:executor] || Concurrent.global_io_executor
@running = Concurrent::AtomicBoolean.new(false)
@age = Concurrent::AtomicFixnum.new(0)
@value = nil

self.observers = Collection::CopyOnNotifyObserverSet.new
Expand All @@ -328,13 +331,15 @@ def ns_kill_execution

# @!visibility private
def schedule_next_task(interval = execution_interval)
ScheduledTask.execute(interval, executor: @executor, args: [Concurrent::Event.new], &method(:execute_task))
ScheduledTask.execute(interval, executor: @executor, args: [Concurrent::Event.new, @age.value], &method(:execute_task))
nil
end

# @!visibility private
def execute_task(completion)
def execute_task(completion, age_when_scheduled)
return nil unless @running.true?
return nil unless @age.value == age_when_scheduled

start_time = Concurrent.monotonic_time
_success, value, reason = @task.execute(self)
if completion.try?
Expand Down
14 changes: 14 additions & 0 deletions spec/concurrent/timer_task_spec.rb
@@ -1,5 +1,6 @@
require_relative 'concern/dereferenceable_shared'
require_relative 'concern/observable_shared'
require 'concurrent/atomic/atomic_fixnum'
require 'concurrent/timer_task'

module Concurrent
Expand Down Expand Up @@ -116,6 +117,19 @@ def trigger_observable(observable)
sleep(0.1)
expect(task.shutdown).to be_truthy
end

it 'will cancel pre-shutdown task even if restarted to avoid double-runs' do
counter = Concurrent::AtomicFixnum.new(0)
task = TimerTask.execute(execution_interval: 0.2, run_now: true) { counter.increment }
sleep 0.05
expect(counter.value).to eq 1

task.shutdown
task.execute

sleep 0.25
expect(counter.value).to eq 3
end
end
end

Expand Down

0 comments on commit af983a1

Please sign in to comment.