diff --git a/Changes.md b/Changes.md index eb4ac9acd..69927a5ab 100644 --- a/Changes.md +++ b/Changes.md @@ -5,6 +5,15 @@ HEAD --------- +- Add `Sidekiq::Job` alias for `Sidekiq::Worker` [#4995] + `Sidekiq::Job` is recommended for future code; it is functionally + identical to `Sidekiq::Worker` but better reflects current terminology. + There is no plan to deprecate `Sidekiq::Worker` at this time. +```ruby +class MyJob + include Sidekiq::Job + sidekiq_options ... +``` - Minimize scheduler load on Redis at scale [#4882] - Improve logging of delay jobs [#4904, BuonOno] - Minor CSS improvements for buttons and tables, design PRs always welcome! diff --git a/lib/generators/sidekiq/templates/worker.rb.erb b/lib/generators/sidekiq/templates/worker.rb.erb index b98f08634..64b624fff 100644 --- a/lib/generators/sidekiq/templates/worker.rb.erb +++ b/lib/generators/sidekiq/templates/worker.rb.erb @@ -1,9 +1,9 @@ <% module_namespacing do -%> -class <%= class_name %>Worker - include Sidekiq::Worker +class <%= class_name %>Job + include Sidekiq::Job def perform(*args) # Do something end end -<% end -%> \ No newline at end of file +<% end -%> diff --git a/lib/generators/sidekiq/templates/worker_spec.rb.erb b/lib/generators/sidekiq/templates/worker_spec.rb.erb index 0c07e2913..d9eaf1c88 100644 --- a/lib/generators/sidekiq/templates/worker_spec.rb.erb +++ b/lib/generators/sidekiq/templates/worker_spec.rb.erb @@ -1,6 +1,6 @@ require 'rails_helper' <% module_namespacing do -%> -RSpec.describe <%= class_name %>Worker, type: :worker do +RSpec.describe <%= class_name %>Job, type: :worker do pending "add some examples to (or delete) #{__FILE__}" end <% end -%> diff --git a/lib/generators/sidekiq/templates/worker_test.rb.erb b/lib/generators/sidekiq/templates/worker_test.rb.erb index d38d99d51..4cd8bc83e 100644 --- a/lib/generators/sidekiq/templates/worker_test.rb.erb +++ b/lib/generators/sidekiq/templates/worker_test.rb.erb @@ -1,6 +1,6 @@ require 'test_helper' <% module_namespacing do -%> -class <%= class_name %>WorkerTest < Minitest::Test +class <%= class_name %>JobTest < Minitest::Test def test_example skip "add some examples to (or delete) #{__FILE__}" end diff --git a/lib/generators/sidekiq/worker_generator.rb b/lib/generators/sidekiq/worker_generator.rb index 0a769827a..c2ad58516 100644 --- a/lib/generators/sidekiq/worker_generator.rb +++ b/lib/generators/sidekiq/worker_generator.rb @@ -3,16 +3,16 @@ module Sidekiq module Generators # :nodoc: class WorkerGenerator < ::Rails::Generators::NamedBase # :nodoc: - desc "This generator creates a Sidekiq Worker in app/workers and a corresponding test" + desc "This generator creates a Sidekiq::Job in app/workers and a corresponding test" - check_class_collision suffix: "Worker" + check_class_collision suffix: "Job" def self.default_generator_root File.dirname(__FILE__) end def create_worker_file - template "worker.rb.erb", File.join("app/workers", class_path, "#{file_name}_worker.rb") + template "worker.rb.erb", File.join("app/workers", class_path, "#{file_name}_job.rb") end def create_test_file @@ -31,7 +31,7 @@ def create_worker_spec template_file = File.join( "spec/workers", class_path, - "#{file_name}_worker_spec.rb" + "#{file_name}_job_spec.rb" ) template "worker_spec.rb.erb", template_file end @@ -40,13 +40,13 @@ def create_worker_test template_file = File.join( "test/workers", class_path, - "#{file_name}_worker_test.rb" + "#{file_name}_job_test.rb" ) template "worker_test.rb.erb", template_file end def file_name - @_file_name ||= super.sub(/_?worker\z/i, "") + @_file_name ||= super.sub(/_?job\z/i, "") end def test_framework diff --git a/lib/sidekiq.rb b/lib/sidekiq.rb index 3ee4f9d26..dc9f39919 100644 --- a/lib/sidekiq.rb +++ b/lib/sidekiq.rb @@ -38,7 +38,9 @@ module Sidekiq reloader: proc { |&block| block.call } } - DEFAULT_WORKER_OPTIONS = { + # Do not use this constant, use: + # Sidekiq.default_job_options = { ...options... } + DEFAULT_JOB_OPTIONS = { "retry" => true, "queue" => "default" } @@ -154,13 +156,13 @@ def self.default_server_middleware Middleware::Chain.new end - def self.default_worker_options=(hash) + def self.default_job_options=(hash) # stringify - @default_worker_options = default_worker_options.merge(hash.transform_keys(&:to_s)) + @default_job_options = default_job_options.merge(hash.transform_keys(&:to_s)) end - def self.default_worker_options - defined?(@default_worker_options) ? @default_worker_options : DEFAULT_WORKER_OPTIONS + def self.default_job_options + defined?(@default_job_options) ? @default_job_options : DEFAULT_JOB_OPTIONS end ## @@ -252,10 +254,10 @@ def self.on(event, &block) # We are shutting down Sidekiq but what about workers that # are working on some long job? This error is - # raised in workers that have not finished within the hard + # raised in jobs that have not finished within the hard # timeout limit. This is needed to rollback db transactions, # otherwise Ruby's Thread#kill will commit. See #377. - # DO NOT RESCUE THIS ERROR IN YOUR WORKERS + # DO NOT RESCUE THIS ERROR IN YOUR JOBS class Shutdown < Interrupt; end end diff --git a/lib/sidekiq/api.rb b/lib/sidekiq/api.rb index 349a36f74..2e7b764b8 100644 --- a/lib/sidekiq/api.rb +++ b/lib/sidekiq/api.rb @@ -205,7 +205,7 @@ def date_stat_hash(stat) # # queue = Sidekiq::Queue.new("mailer") # queue.each do |job| - # job.klass # => 'MyWorker' + # job.klass # => 'MyJob' # job.args # => [1, 2, 3] # job.delete if job.jid == 'abcdef1234567890' # end @@ -267,7 +267,7 @@ def each break if entries.empty? page += 1 entries.each do |entry| - yield Job.new(entry, @name) + yield JobRecord.new(entry, @name) end deleted_size = initial_size - size end @@ -298,9 +298,9 @@ def clear # sorted set. # # The job should be considered immutable but may be - # removed from the queue via Job#delete. + # removed from the queue via JobRecord#delete. # - class Job + class JobRecord attr_reader :item attr_reader :value @@ -457,7 +457,7 @@ def uncompress_backtrace(backtrace) end end - class SortedEntry < Job + class SortedEntry < JobRecord attr_reader :score attr_reader :parent diff --git a/lib/sidekiq/cli.rb b/lib/sidekiq/cli.rb index df6339a67..4bc1adeee 100644 --- a/lib/sidekiq/cli.rb +++ b/lib/sidekiq/cli.rb @@ -287,7 +287,7 @@ def validate! (File.directory?(options[:require]) && !File.exist?("#{options[:require]}/config/application.rb")) logger.info "==================================================================" logger.info " Please point Sidekiq to a Rails application or a Ruby file " - logger.info " to load your worker classes with -r [DIR|FILE]." + logger.info " to load your job classes with -r [DIR|FILE]." logger.info "==================================================================" logger.info @parser die(1) @@ -328,7 +328,7 @@ def option_parser(opts) parse_queue opts, queue, weight end - o.on "-r", "--require [PATH|DIR]", "Location of Rails application with workers or file to require" do |arg| + o.on "-r", "--require [PATH|DIR]", "Location of Rails application with jobs or file to require" do |arg| opts[:require] = arg end diff --git a/lib/sidekiq/client.rb b/lib/sidekiq/client.rb index c7e098c7b..e26dbd7ac 100644 --- a/lib/sidekiq/client.rb +++ b/lib/sidekiq/client.rb @@ -12,7 +12,7 @@ class Client # client.middleware do |chain| # chain.use MyClientMiddleware # end - # client.push('class' => 'SomeWorker', 'args' => [1,2,3]) + # client.push('class' => 'SomeJob', 'args' => [1,2,3]) # # All client instances default to the globally-defined # Sidekiq.client_middleware but you can change as necessary. @@ -46,16 +46,16 @@ def initialize(redis_pool = nil) # The main method used to push a job to Redis. Accepts a number of options: # # queue - the named queue to use, default 'default' - # class - the worker class to call, required + # class - the job class to call, required # args - an array of simple arguments to the perform method, must be JSON-serializable # at - timestamp to schedule the job (optional), must be Numeric (e.g. Time.now.to_f) # retry - whether to retry this job if it fails, default true or an integer number of retries # backtrace - whether to save any error backtrace, default false # # If class is set to the class name, the jobs' options will be based on Sidekiq's default - # worker options. Otherwise, they will be based on the job class's options. + # job options. Otherwise, they will be based on the job class's options. # - # Any options valid for a worker class's sidekiq_options are also available here. + # Any options valid for a job class's sidekiq_options are also available here. # # All options must be strings, not symbols. NB: because we are serializing to JSON, all # symbols in 'args' will be converted to strings. Note that +backtrace: true+ can take quite a bit of @@ -64,7 +64,7 @@ def initialize(redis_pool = nil) # Returns a unique Job ID. If middleware stops the job, nil will be returned instead. # # Example: - # push('queue' => 'my_queue', 'class' => MyWorker, 'args' => ['foo', 1, :bat => 'bar']) + # push('queue' => 'my_queue', 'class' => MyJob, 'args' => ['foo', 1, :bat => 'bar']) # def push(item) normed = normalize_item(item) @@ -116,8 +116,8 @@ def push_bulk(items) # # pool = ConnectionPool.new { Redis.new } # Sidekiq::Client.via(pool) do - # SomeWorker.perform_async(1,2,3) - # SomeOtherWorker.perform_async(1,2,3) + # SomeJob.perform_async(1,2,3) + # SomeOtherJob.perform_async(1,2,3) # end # # Generally this is only needed for very large Sidekiq installs processing @@ -142,10 +142,10 @@ def push_bulk(items) end # Resque compatibility helpers. Note all helpers - # should go through Worker#client_push. + # should go through Job#client_push. # # Example usage: - # Sidekiq::Client.enqueue(MyWorker, 'foo', 1, :bat => 'bar') + # Sidekiq::Client.enqueue(MyJob, 'foo', 1, :bat => 'bar') # # Messages are enqueued to the 'default' queue. # @@ -154,14 +154,14 @@ def enqueue(klass, *args) end # Example usage: - # Sidekiq::Client.enqueue_to(:queue_name, MyWorker, 'foo', 1, :bat => 'bar') + # Sidekiq::Client.enqueue_to(:queue_name, MyJob, 'foo', 1, :bat => 'bar') # def enqueue_to(queue, klass, *args) klass.client_push("queue" => queue, "class" => klass, "args" => args) end # Example usage: - # Sidekiq::Client.enqueue_to_in(:queue_name, 3.minutes, MyWorker, 'foo', 1, :bat => 'bar') + # Sidekiq::Client.enqueue_to_in(:queue_name, 3.minutes, MyJob, 'foo', 1, :bat => 'bar') # def enqueue_to_in(queue, interval, klass, *args) int = interval.to_f @@ -175,7 +175,7 @@ def enqueue_to_in(queue, interval, klass, *args) end # Example usage: - # Sidekiq::Client.enqueue_in(3.minutes, MyWorker, 'foo', 1, :bat => 'bar') + # Sidekiq::Client.enqueue_in(3.minutes, MyJob, 'foo', 1, :bat => 'bar') # def enqueue_in(interval, klass, *args) klass.perform_in(interval, *args) @@ -211,10 +211,10 @@ def atomic_push(conn, payloads) end end - def process_single(worker_class, item) + def process_single(job_class, item) queue = item["queue"] - middleware.invoke(worker_class, item, queue, @redis_pool) do + middleware.invoke(job_class, item, queue, @redis_pool) do item end end @@ -249,10 +249,10 @@ def normalize_item(item) def normalized_hash(item_class) if item_class.is_a?(Class) - raise(ArgumentError, "Message must include a Sidekiq::Worker class, not class name: #{item_class.ancestors.inspect}") unless item_class.respond_to?("get_sidekiq_options") + raise(ArgumentError, "Message must include a Sidekiq::Job class, not class name: #{item_class.ancestors.inspect}") unless item_class.respond_to?("get_sidekiq_options") item_class.get_sidekiq_options else - Sidekiq.default_worker_options + Sidekiq.default_job_options end end end diff --git a/lib/sidekiq/extensions/action_mailer.rb b/lib/sidekiq/extensions/action_mailer.rb index 444b82aa1..6fcdf4e0f 100644 --- a/lib/sidekiq/extensions/action_mailer.rb +++ b/lib/sidekiq/extensions/action_mailer.rb @@ -13,7 +13,7 @@ module Extensions # UserMailer.delay_for(5.days).send_welcome_email(new_user) # UserMailer.delay_until(5.days.from_now).send_welcome_email(new_user) class DelayedMailer - include Sidekiq::Worker + include Sidekiq::Job def perform(yml) (target, method_name, args) = YAML.load(yml) diff --git a/lib/sidekiq/extensions/active_record.rb b/lib/sidekiq/extensions/active_record.rb index 5342ed851..451daeb9e 100644 --- a/lib/sidekiq/extensions/active_record.rb +++ b/lib/sidekiq/extensions/active_record.rb @@ -15,7 +15,7 @@ module Extensions # object to Redis. Your Sidekiq jobs should pass IDs, not entire instances. # This is here for backwards compatibility with Delayed::Job only. class DelayedModel - include Sidekiq::Worker + include Sidekiq::Job def perform(yml) (target, method_name, args) = YAML.load(yml) diff --git a/lib/sidekiq/extensions/class_methods.rb b/lib/sidekiq/extensions/class_methods.rb index 1723b6f85..d27e523d6 100644 --- a/lib/sidekiq/extensions/class_methods.rb +++ b/lib/sidekiq/extensions/class_methods.rb @@ -13,7 +13,7 @@ module Extensions # Wikipedia.delay.download_changes_for(Date.today) # class DelayedClass - include Sidekiq::Worker + include Sidekiq::Job def perform(yml) (target, method_name, args) = YAML.load(yml) diff --git a/lib/sidekiq/job.rb b/lib/sidekiq/job.rb new file mode 100644 index 000000000..112cde7c6 --- /dev/null +++ b/lib/sidekiq/job.rb @@ -0,0 +1,244 @@ +# frozen_string_literal: true + +require "sidekiq/client" + +module Sidekiq + ## + # Include this module in your job class and you can easily create + # asynchronous jobs: + # + # class HardJob + # include Sidekiq::Job + # + # def perform(*args) + # # do some work + # end + # end + # + # Then in your Rails app, you can do this: + # + # HardJob.perform_async(1, 2, 3) + # + # Note that perform_async is a class method, perform is an instance method. + module Job + ## + # The Options module is extracted so we can include it in ActiveJob::Base + # and allow native AJs to configure Sidekiq features/internals. + module Options + def self.included(base) + base.extend(ClassMethods) + base.sidekiq_class_attribute :sidekiq_options_hash + base.sidekiq_class_attribute :sidekiq_retry_in_block + base.sidekiq_class_attribute :sidekiq_retries_exhausted_block + end + + module ClassMethods + ACCESSOR_MUTEX = Mutex.new + + ## + # Allows customization for this type of Job. + # Legal options: + # + # queue - name of queue to use for this job type, default *default* + # retry - enable retries for this Job in case of error during execution, + # *true* to use the default or *Integer* count + # backtrace - whether to save any error backtrace in the retry payload to display in web UI, + # can be true, false or an integer number of lines to save, default *false* + # + # In practice, any option is allowed. This is the main mechanism to configure the + # options for a specific job. + def sidekiq_options(opts = {}) + opts = opts.transform_keys(&:to_s) # stringify + self.sidekiq_options_hash = get_sidekiq_options.merge(opts) + end + + def sidekiq_retry_in(&block) + self.sidekiq_retry_in_block = block + end + + def sidekiq_retries_exhausted(&block) + self.sidekiq_retries_exhausted_block = block + end + + def get_sidekiq_options # :nodoc: + self.sidekiq_options_hash ||= Sidekiq.default_job_options + end + + def sidekiq_class_attribute(*attrs) + instance_reader = true + instance_writer = true + + attrs.each do |name| + synchronized_getter = "__synchronized_#{name}" + + singleton_class.instance_eval do + undef_method(name) if method_defined?(name) || private_method_defined?(name) + end + + define_singleton_method(synchronized_getter) { nil } + singleton_class.class_eval do + private(synchronized_getter) + end + + define_singleton_method(name) { ACCESSOR_MUTEX.synchronize { send synchronized_getter } } + + ivar = "@#{name}" + + singleton_class.instance_eval do + m = "#{name}=" + undef_method(m) if method_defined?(m) || private_method_defined?(m) + end + define_singleton_method("#{name}=") do |val| + singleton_class.class_eval do + ACCESSOR_MUTEX.synchronize do + undef_method(synchronized_getter) if method_defined?(synchronized_getter) || private_method_defined?(synchronized_getter) + define_method(synchronized_getter) { val } + end + end + + if singleton_class? + class_eval do + undef_method(name) if method_defined?(name) || private_method_defined?(name) + define_method(name) do + if instance_variable_defined? ivar + instance_variable_get ivar + else + singleton_class.send name + end + end + end + end + val + end + + if instance_reader + undef_method(name) if method_defined?(name) || private_method_defined?(name) + define_method(name) do + if instance_variable_defined?(ivar) + instance_variable_get ivar + else + self.class.public_send name + end + end + end + + if instance_writer + m = "#{name}=" + undef_method(m) if method_defined?(m) || private_method_defined?(m) + attr_writer name + end + end + end + end + end + + attr_accessor :jid + + def self.included(base) + raise ArgumentError, "#{name} cannot be included in an ActiveJob: #{base.name}" if base.ancestors.any? { |c| c.name == "ActiveJob::Base" } + + base.include(Options) + base.extend(ClassMethods) + end + + def logger + Sidekiq.logger + end + + # This helper class encapsulates the set options for `set`, e.g. + # + # SomeJob.set(queue: 'foo').perform_async(....) + # + class Setter + def initialize(klass, opts) + @klass = klass + @opts = opts + end + + def set(options) + @opts.merge!(options) + self + end + + def perform_async(*args) + @klass.client_push(@opts.merge("args" => args, "class" => @klass)) + end + + # +interval+ must be a timestamp, numeric or something that acts + # numeric (like an activesupport time interval). + def perform_in(interval, *args) + int = interval.to_f + now = Time.now.to_f + ts = (int < 1_000_000_000 ? now + int : int) + + payload = @opts.merge("class" => @klass, "args" => args) + # Optimization to enqueue something now that is scheduled to go out now or in the past + payload["at"] = ts if ts > now + @klass.client_push(payload) + end + alias_method :perform_at, :perform_in + end + + module ClassMethods + def delay(*args) + raise ArgumentError, "Do not call .delay on a Sidekiq::Job class, call .perform_async" + end + + def delay_for(*args) + raise ArgumentError, "Do not call .delay_for on a Sidekiq::Job class, call .perform_in" + end + + def delay_until(*args) + raise ArgumentError, "Do not call .delay_until on a Sidekiq::Job class, call .perform_at" + end + + def set(options) + Setter.new(self, options) + end + + def perform_async(*args) + client_push("class" => self, "args" => args) + end + + # +interval+ must be a timestamp, numeric or something that acts + # numeric (like an activesupport time interval). + def perform_in(interval, *args) + int = interval.to_f + now = Time.now.to_f + ts = (int < 1_000_000_000 ? now + int : int) + + item = {"class" => self, "args" => args} + + # Optimization to enqueue something now that is scheduled to go out now or in the past + item["at"] = ts if ts > now + + client_push(item) + end + alias_method :perform_at, :perform_in + + ## + # Allows customization for this type of Job. + # Legal options: + # + # queue - use a named queue for this Job, default 'default' + # retry - enable the RetryJobs middleware for this Job, *true* to use the default + # or *Integer* count + # backtrace - whether to save any error backtrace in the retry payload to display in web UI, + # can be true, false or an integer number of lines to save, default *false* + # pool - use the given Redis connection pool to push this type of job to a given shard. + # + # In practice, any option is allowed. This is the main mechanism to configure the + # options for a specific job. + def sidekiq_options(opts = {}) + super + end + + def client_push(item) # :nodoc: + pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options["pool"] || Sidekiq.redis_pool + stringified_item = item.transform_keys(&:to_s) + + Sidekiq::Client.new(pool).push(stringified_item) + end + end + end +end diff --git a/lib/sidekiq/job_retry.rb b/lib/sidekiq/job_retry.rb index 552a75403..d4a9a8894 100644 --- a/lib/sidekiq/job_retry.rb +++ b/lib/sidekiq/job_retry.rb @@ -25,11 +25,11 @@ module Sidekiq # # A job looks like: # - # { 'class' => 'HardWorker', 'args' => [1, 2, 'foo'], 'retry' => true } + # { 'class' => 'HardJob', 'args' => [1, 2, 'foo'], 'retry' => true } # # The 'retry' option also accepts a number (in place of 'true'): # - # { 'class' => 'HardWorker', 'args' => [1, 2, 'foo'], 'retry' => 5 } + # { 'class' => 'HardJob', 'args' => [1, 2, 'foo'], 'retry' => 5 } # # The job will be retried this number of times before giving up. (If simply # 'true', Sidekiq retries 25 times) @@ -52,10 +52,10 @@ module Sidekiq # # Sidekiq.options[:max_retries] = 7 # - # or limit the number of retries for a particular worker with: + # or limit the number of retries for a particular job type with: # - # class MyWorker - # include Sidekiq::Worker + # class MyJob + # include Sidekiq::Job # sidekiq_options :retry => 10 # end # @@ -74,7 +74,7 @@ def initialize(options = {}) # The global retry handler requires only the barest of data. # We want to be able to retry as much as possible so we don't - # require the worker to be instantiated. + # require the job to be instantiated. def global(jobstr, queue) yield rescue Handled => ex @@ -101,14 +101,14 @@ def global(jobstr, queue) end # The local retry support means that any errors that occur within - # this block can be associated with the given worker instance. + # this block can be associated with the given job instance. # This is required to support the `sidekiq_retries_exhausted` block. # # Note that any exception from the block is wrapped in the Skip # exception so the global block does not reprocess the error. The # Skip exception is unwrapped within Sidekiq::Processor#process before # calling the handle_exception handlers. - def local(worker, jobstr, queue) + def local(job_instance, jobstr, queue) yield rescue Handled => ex raise ex @@ -121,11 +121,11 @@ def local(worker, jobstr, queue) msg = Sidekiq.load_json(jobstr) if msg["retry"].nil? - msg["retry"] = worker.class.get_sidekiq_options["retry"] + msg["retry"] = job_instance.class.get_sidekiq_options["retry"] end raise e unless msg["retry"] - attempt_retry(worker, msg, queue, e) + attempt_retry(job_instance, msg, queue, e) # We've handled this error associated with this job, don't # need to handle it at the global level raise Skip @@ -133,10 +133,10 @@ def local(worker, jobstr, queue) private - # Note that +worker+ can be nil here if an error is raised before we can - # instantiate the worker instance. All access must be guarded and + # Note that +job_instance+ can be nil here if an error is raised before we can + # instantiate the job class. All access must be guarded and # best effort. - def attempt_retry(worker, msg, queue, exception) + def attempt_retry(job_instance, msg, queue, exception) max_retry_attempts = retry_attempts_from(msg["retry"], @max_retries) msg["queue"] = (msg["retry_queue"] || queue) @@ -168,7 +168,7 @@ def attempt_retry(worker, msg, queue, exception) end if count < max_retry_attempts - delay = delay_for(worker, count, exception) + delay = delay_for(job_instance, count, exception) # Logging here can break retries if the logging device raises ENOSPC #3979 # logger.debug { "Failure! Retry #{count} in #{delay} seconds" } retry_at = Time.now.to_f + delay @@ -178,13 +178,13 @@ def attempt_retry(worker, msg, queue, exception) end else # Goodbye dear message, you (re)tried your best I'm sure. - retries_exhausted(worker, msg, exception) + retries_exhausted(job_instance, msg, exception) end end - def retries_exhausted(worker, msg, exception) + def retries_exhausted(job_instance, msg, exception) begin - block = worker&.sidekiq_retries_exhausted_block + block = job_instance&.sidekiq_retries_exhausted_block block&.call(msg, exception) rescue => e handle_exception(e, {context: "Error calling retries_exhausted", job: msg}) @@ -213,9 +213,9 @@ def retry_attempts_from(msg_retry, default) end end - def delay_for(worker, count, exception) - if worker&.sidekiq_retry_in_block - custom_retry_in = retry_in(worker, count, exception).to_i + def delay_for(job_instance, count, exception) + if job_instance&.sidekiq_retry_in_block + custom_retry_in = retry_in(job_instance, count, exception).to_i return custom_retry_in if custom_retry_in > 0 end seconds_to_delay(count) @@ -226,10 +226,10 @@ def seconds_to_delay(count) (count**4) + 15 + (rand(30) * (count + 1)) end - def retry_in(worker, count, exception) - worker.sidekiq_retry_in_block.call(count, exception) + def retry_in(job_instance, count, exception) + job_instance.sidekiq_retry_in_block.call(count, exception) rescue Exception => e - handle_exception(e, {context: "Failure scheduling retry using the defined `sidekiq_retry_in` in #{worker.class.name}, falling back to default"}) + handle_exception(e, {context: "Failure scheduling retry using the defined `sidekiq_retry_in` in #{job_instance.class.name}, falling back to default"}) nil end diff --git a/lib/sidekiq/launcher.rb b/lib/sidekiq/launcher.rb index ef6d8b32d..1fe4a7952 100644 --- a/lib/sidekiq/launcher.rb +++ b/lib/sidekiq/launcher.rb @@ -55,7 +55,7 @@ def stop @manager.stop(deadline) - # Requeue everything in case there was a worker who grabbed work while stopped + # Requeue everything in case there was a thread who grabbed work while stopped # This call is a no-op in Sidekiq but necessary for Sidekiq Pro. strategy = @options[:fetch] strategy.bulk_requeue([], @options) diff --git a/lib/sidekiq/manager.rb b/lib/sidekiq/manager.rb index 35cec6c5c..47c7c3101 100644 --- a/lib/sidekiq/manager.rb +++ b/lib/sidekiq/manager.rb @@ -50,7 +50,7 @@ def quiet return if @done @done = true - logger.info { "Terminating quiet workers" } + logger.info { "Terminating quiet threads" } @workers.each { |x| x.terminate } fire_event(:quiet, reverse: true) end @@ -68,7 +68,7 @@ def stop(deadline) sleep PAUSE_TIME return if @workers.empty? - logger.info { "Pausing to allow workers to finish..." } + logger.info { "Pausing to allow jobs to finish..." } remaining = deadline - ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) while remaining > PAUSE_TIME return if @workers.empty? @@ -104,7 +104,7 @@ def stopped? private def hard_shutdown - # We've reached the timeout and we still have busy workers. + # We've reached the timeout and we still have busy threads. # They must die but their jobs shall live on. cleanup = nil @plock.synchronize do @@ -114,12 +114,12 @@ def hard_shutdown if cleanup.size > 0 jobs = cleanup.map { |p| p.job }.compact - logger.warn { "Terminating #{cleanup.size} busy worker threads" } + logger.warn { "Terminating #{cleanup.size} busy processor threads" } logger.warn { "Work still in progress #{jobs.inspect}" } # Re-enqueue unfinished jobs # NOTE: You may notice that we may push a job back to redis before - # the worker thread is terminated. This is ok because Sidekiq's + # the thread is terminated. This is ok because Sidekiq's # contract says that jobs are run AT LEAST once. Process termination # is delayed until we're certain the jobs are back in Redis because # it is worse to lose a job than to run it twice. diff --git a/lib/sidekiq/middleware/chain.rb b/lib/sidekiq/middleware/chain.rb index c039cb3f1..78482009d 100644 --- a/lib/sidekiq/middleware/chain.rb +++ b/lib/sidekiq/middleware/chain.rb @@ -44,7 +44,7 @@ module Sidekiq # This is an example of a minimal server middleware: # # class MyServerHook - # def call(worker_instance, msg, queue) + # def call(job_instance, msg, queue) # puts "Before work" # yield # puts "After work" @@ -56,7 +56,7 @@ module Sidekiq # to Redis: # # class MyClientHook - # def call(worker_class, msg, queue, redis_pool) + # def call(job_class, msg, queue, redis_pool) # puts "Before push" # result = yield # puts "After push" diff --git a/lib/sidekiq/middleware/i18n.rb b/lib/sidekiq/middleware/i18n.rb index 93c600257..2ad6367dd 100644 --- a/lib/sidekiq/middleware/i18n.rb +++ b/lib/sidekiq/middleware/i18n.rb @@ -10,7 +10,7 @@ module Sidekiq::Middleware::I18n # Get the current locale and store it in the message # to be sent to Sidekiq. class Client - def call(_worker, msg, _queue, _redis) + def call(_job, msg, _queue, _redis) msg["locale"] ||= I18n.locale yield end @@ -18,7 +18,7 @@ def call(_worker, msg, _queue, _redis) # Pull the msg locale out and set the current thread to use it. class Server - def call(_worker, msg, _queue, &block) + def call(_job, msg, _queue, &block) I18n.with_locale(msg.fetch("locale", I18n.default_locale), &block) end end diff --git a/lib/sidekiq/processor.rb b/lib/sidekiq/processor.rb index 39d3752c6..aa702dd59 100644 --- a/lib/sidekiq/processor.rb +++ b/lib/sidekiq/processor.rb @@ -11,7 +11,7 @@ module Sidekiq # # 1. fetches a job from Redis # 2. executes the job - # a. instantiate the Worker + # a. instantiate the job # b. run the middleware chain # c. call #perform # @@ -125,15 +125,15 @@ def dispatch(job_hash, queue, jobstr) @job_logger.call(job_hash, queue) do stats(jobstr, queue) do # Rails 5 requires a Reloader to wrap code execution. In order to - # constantize the worker and instantiate an instance, we have to call + # constantize the job class and instantiate an instance, we have to call # the Reloader. It handles code loading, db connection management, etc. # Effectively this block denotes a "unit of work" to Rails. @reloader.call do klass = constantize(job_hash["class"]) - worker = klass.new - worker.jid = job_hash["jid"] - @retrier.local(worker, jobstr, queue) do - yield worker + job = klass.new + job.jid = job_hash["jid"] + @retrier.local(job, jobstr, queue) do + yield job end end end @@ -159,9 +159,9 @@ def process(work) ack = false begin - dispatch(job_hash, queue, jobstr) do |worker| - Sidekiq.server_middleware.invoke(worker, job_hash, queue) do - execute_job(worker, job_hash["args"]) + dispatch(job_hash, queue, jobstr) do |job| + Sidekiq.server_middleware.invoke(job, job_hash, queue) do + execute_job(job, job_hash["args"]) end end ack = true @@ -192,8 +192,8 @@ def process(work) end end - def execute_job(worker, cloned_args) - worker.perform(*cloned_args) + def execute_job(job, cloned_args) + job.perform(*cloned_args) end # Ruby doesn't provide atomic counters out of the box so we'll diff --git a/lib/sidekiq/rails.rb b/lib/sidekiq/rails.rb index 5ac6b3dcb..d715dc2b8 100644 --- a/lib/sidekiq/rails.rb +++ b/lib/sidekiq/rails.rb @@ -1,6 +1,6 @@ # frozen_string_literal: true -require "sidekiq/worker" +require "sidekiq/job" module Sidekiq class Rails < ::Rails::Engine @@ -33,7 +33,7 @@ def inspect # end initializer "sidekiq.active_job_integration" do ActiveSupport.on_load(:active_job) do - include ::Sidekiq::Worker::Options unless respond_to?(:sidekiq_options) + include ::Sidekiq::Job::Options unless respond_to?(:sidekiq_options) end end diff --git a/lib/sidekiq/scheduled.rb b/lib/sidekiq/scheduled.rb index 9d4a2b534..6d25343b6 100644 --- a/lib/sidekiq/scheduled.rb +++ b/lib/sidekiq/scheduled.rb @@ -38,7 +38,7 @@ def enqueue_jobs(now = Time.now.to_f.to_s, sorted_sets = SETS) # The Poller checks Redis every N seconds for jobs in the retry or scheduled # set have passed their timestamp and should be enqueued. If so, it # just pops the job back onto its original queue so the - # workers can pick it up like any other job. + # threads can pick it up like any other job. class Poller include Util @@ -164,9 +164,9 @@ def process_count end def initial_wait - # Have all processes sleep between 5-15 seconds. 10 seconds - # to give time for the heartbeat to register (if the poll interval is going to be calculated by the number - # of workers), and 5 random seconds to ensure they don't all hit Redis at the same time. + # Have all processes sleep between 5-15 seconds. 10 seconds to give time + # for the heartbeat to register (if the poll interval is going to be calculated by the number + # of threads), and 5 random seconds to ensure they don't all hit Redis at the same time. total = 0 total += INITIAL_WAIT unless Sidekiq.options[:poll_interval_average] total += (5 * rand) diff --git a/lib/sidekiq/testing.rb b/lib/sidekiq/testing.rb index 25c0ee281..d89647d86 100644 --- a/lib/sidekiq/testing.rb +++ b/lib/sidekiq/testing.rb @@ -101,20 +101,20 @@ module Queues ## # The Queues class is only for testing the fake queue implementation. # There are 2 data structures involved in tandem. This is due to the - # Rspec syntax of change(QueueWorker.jobs, :size). It keeps a reference + # Rspec syntax of change(QueueJob.jobs, :size). It keeps a reference # to the array. Because the array was dervied from a filter of the total # jobs enqueued, it appeared as though the array didn't change. # # To solve this, we'll keep 2 hashes containing the jobs. One with keys based - # on the queue, and another with keys of the worker names, so the array for - # QueueWorker.jobs is a straight reference to a real array. + # on the queue, and another with keys of the job names, so the array for + # QueueJob.jobs is a straight reference to a real array. # # Queue-based hash: # # { # "default"=>[ # { - # "class"=>"TestTesting::QueueWorker", + # "class"=>"TestTesting::QueueJob", # "args"=>[1, 2], # "retry"=>true, # "queue"=>"default", @@ -124,12 +124,12 @@ module Queues # ] # } # - # Worker-based hash: + # Job-based hash: # # { - # "TestTesting::QueueWorker"=>[ + # "TestTesting::QueueJob"=>[ # { - # "class"=>"TestTesting::QueueWorker", + # "class"=>"TestTesting::QueueJob", # "args"=>[1, 2], # "retry"=>true, # "queue"=>"default", @@ -144,14 +144,15 @@ module Queues # require 'sidekiq/testing' # # assert_equal 0, Sidekiq::Queues["default"].size - # HardWorker.perform_async(:something) + # HardJob.perform_async(:something) # assert_equal 1, Sidekiq::Queues["default"].size # assert_equal :something, Sidekiq::Queues["default"].first['args'][0] # - # You can also clear all workers' jobs: + # You can also clear all jobs: # # assert_equal 0, Sidekiq::Queues["default"].size - # HardWorker.perform_async(:something) + # HardJob.perform_async(:something) + # assert_equal 1, Sidekiq::Queues["default"].size # Sidekiq::Queues.clear_all # assert_equal 0, Sidekiq::Queues["default"].size # @@ -170,35 +171,36 @@ def [](queue) def push(queue, klass, job) jobs_by_queue[queue] << job - jobs_by_worker[klass] << job + jobs_by_type[klass] << job end def jobs_by_queue @jobs_by_queue ||= Hash.new { |hash, key| hash[key] = [] } end - def jobs_by_worker - @jobs_by_worker ||= Hash.new { |hash, key| hash[key] = [] } + def jobs_by_type + @jobs_by_type ||= Hash.new { |hash, key| hash[key] = [] } end + alias_method :jobs_by_worker, :jobs_by_type def delete_for(jid, queue, klass) jobs_by_queue[queue.to_s].delete_if { |job| job["jid"] == jid } - jobs_by_worker[klass].delete_if { |job| job["jid"] == jid } + jobs_by_type[klass].delete_if { |job| job["jid"] == jid } end def clear_for(queue, klass) jobs_by_queue[queue].clear - jobs_by_worker[klass].clear + jobs_by_type[klass].clear end def clear_all jobs_by_queue.clear - jobs_by_worker.clear + jobs_by_type.clear end end end - module Worker + module Job ## # The Sidekiq testing infrastructure overrides perform_async # so that it does not actually touch the network. Instead it @@ -212,16 +214,16 @@ module Worker # # require 'sidekiq/testing' # - # assert_equal 0, HardWorker.jobs.size - # HardWorker.perform_async(:something) - # assert_equal 1, HardWorker.jobs.size - # assert_equal :something, HardWorker.jobs[0]['args'][0] + # assert_equal 0, HardJob.jobs.size + # HardJob.perform_async(:something) + # assert_equal 1, HardJob.jobs.size + # assert_equal :something, HardJob.jobs[0]['args'][0] # # assert_equal 0, Sidekiq::Extensions::DelayedMailer.jobs.size # MyMailer.delay.send_welcome_email('foo@example.com') # assert_equal 1, Sidekiq::Extensions::DelayedMailer.jobs.size # - # You can also clear and drain all workers' jobs: + # You can also clear and drain all jobs: # # assert_equal 0, Sidekiq::Extensions::DelayedMailer.jobs.size # assert_equal 0, Sidekiq::Extensions::DelayedModel.jobs.size @@ -232,7 +234,7 @@ module Worker # assert_equal 1, Sidekiq::Extensions::DelayedMailer.jobs.size # assert_equal 1, Sidekiq::Extensions::DelayedModel.jobs.size # - # Sidekiq::Worker.clear_all # or .drain_all + # Sidekiq::Job.clear_all # or .drain_all # # assert_equal 0, Sidekiq::Extensions::DelayedMailer.jobs.size # assert_equal 0, Sidekiq::Extensions::DelayedModel.jobs.size @@ -241,36 +243,36 @@ module Worker # # RSpec.configure do |config| # config.before(:each) do - # Sidekiq::Worker.clear_all + # Sidekiq::Job.clear_all # end # end # # or for acceptance testing, i.e. with cucumber: # # AfterStep do - # Sidekiq::Worker.drain_all + # Sidekiq::Job.drain_all # end # # When I sign up as "foo@example.com" # Then I should receive a welcome email to "foo@example.com" # module ClassMethods - # Queue for this worker + # Queue for this job type def queue get_sidekiq_options["queue"] end - # Jobs queued for this worker + # Jobs queued for this job type def jobs - Queues.jobs_by_worker[to_s] + Queues.jobs_by_type[to_s] end - # Clear all jobs for this worker + # Clear all jobs for this job type def clear Queues.clear_for(queue, to_s) end - # Drain and run all jobs for this worker + # Drain and run all jobs for this job type def drain while jobs.any? next_job = jobs.first @@ -287,17 +289,17 @@ def perform_one process_job(next_job) end - def process_job(job) - worker = new - worker.jid = job["jid"] - worker.bid = job["bid"] if worker.respond_to?(:bid=) - Sidekiq::Testing.server_middleware.invoke(worker, job, job["queue"]) do - execute_job(worker, job["args"]) + def process_job(job_hash) + job = new + job.jid = job_hash["jid"] + job.bid = job_hash["bid"] if job.respond_to?(:bid=) + Sidekiq::Testing.server_middleware.invoke(job, job_hash, job_hash["queue"]) do + execute_job(job, job_hash["args"]) end end - def execute_job(worker, args) - worker.perform(*args) + def execute_job(job, args) + job.perform(*args) end end @@ -306,18 +308,18 @@ def jobs # :nodoc: Queues.jobs_by_queue.values.flatten end - # Clear all queued jobs across all workers + # Clear all queued jobs across all job types def clear_all Queues.clear_all end - # Drain all queued jobs across all workers + # Drain all queued jobs across all job types def drain_all while jobs.any? - worker_classes = jobs.map { |job| job["class"] }.uniq + job_classes = jobs.map { |job| job["class"] }.uniq - worker_classes.each do |worker_class| - Sidekiq::Testing.constantize(worker_class).drain + job_classes.each do |job_class| + Sidekiq::Testing.constantize(job_class).drain end end end diff --git a/lib/sidekiq/testing/inline.rb b/lib/sidekiq/testing/inline.rb index d83e929c4..44548b8ea 100644 --- a/lib/sidekiq/testing/inline.rb +++ b/lib/sidekiq/testing/inline.rb @@ -4,7 +4,7 @@ ## # The Sidekiq inline infrastructure overrides perform_async so that it -# actually calls perform instead. This allows workers to be run inline in a +# actually calls perform instead. This allows jobs to be run inline in a # testing environment. # # This is similar to `Resque.inline = true` functionality. @@ -15,8 +15,8 @@ # # $external_variable = 0 # -# class ExternalWorker -# include Sidekiq::Worker +# class ExternalJob +# include Sidekiq::Job # # def perform # $external_variable = 1 @@ -24,7 +24,7 @@ # end # # assert_equal 0, $external_variable -# ExternalWorker.perform_async +# ExternalJob.perform_async # assert_equal 1, $external_variable # Sidekiq::Testing.inline! diff --git a/lib/sidekiq/web/application.rb b/lib/sidekiq/web/application.rb index 1d2f23727..9d17cb9d3 100644 --- a/lib/sidekiq/web/application.rb +++ b/lib/sidekiq/web/application.rb @@ -92,7 +92,7 @@ def self.set(key, val) @count = (params["count"] || 25).to_i @queue = Sidekiq::Queue.new(@name) (@current_page, @total_size, @messages) = page("queue:#{@name}", params["page"], @count, reverse: params["direction"] == "asc") - @messages = @messages.map { |msg| Sidekiq::Job.new(msg, @name) } + @messages = @messages.map { |msg| Sidekiq::JobRecord.new(msg, @name) } erb(:queue) end @@ -113,7 +113,7 @@ def self.set(key, val) post "/queues/:name/delete" do name = route_params[:name] - Sidekiq::Job.new(params["key_val"], name).delete + Sidekiq::JobRecord.new(params["key_val"], name).delete redirect_with_query("#{root_path}queues/#{CGI.escape(name)}") end diff --git a/lib/sidekiq/worker.rb b/lib/sidekiq/worker.rb index ea2c879ed..8f0ef39ac 100644 --- a/lib/sidekiq/worker.rb +++ b/lib/sidekiq/worker.rb @@ -1,244 +1,5 @@ -# frozen_string_literal: true - -require "sidekiq/client" +require "sidekiq/job" module Sidekiq - ## - # Include this module in your worker class and you can easily create - # asynchronous jobs: - # - # class HardWorker - # include Sidekiq::Worker - # - # def perform(*args) - # # do some work - # end - # end - # - # Then in your Rails app, you can do this: - # - # HardWorker.perform_async(1, 2, 3) - # - # Note that perform_async is a class method, perform is an instance method. - module Worker - ## - # The Options module is extracted so we can include it in ActiveJob::Base - # and allow native AJs to configure Sidekiq features/internals. - module Options - def self.included(base) - base.extend(ClassMethods) - base.sidekiq_class_attribute :sidekiq_options_hash - base.sidekiq_class_attribute :sidekiq_retry_in_block - base.sidekiq_class_attribute :sidekiq_retries_exhausted_block - end - - module ClassMethods - ACCESSOR_MUTEX = Mutex.new - - ## - # Allows customization for this type of Worker. - # Legal options: - # - # queue - name of queue to use for this job type, default *default* - # retry - enable retries for this Worker in case of error during execution, - # *true* to use the default or *Integer* count - # backtrace - whether to save any error backtrace in the retry payload to display in web UI, - # can be true, false or an integer number of lines to save, default *false* - # - # In practice, any option is allowed. This is the main mechanism to configure the - # options for a specific job. - def sidekiq_options(opts = {}) - opts = opts.transform_keys(&:to_s) # stringify - self.sidekiq_options_hash = get_sidekiq_options.merge(opts) - end - - def sidekiq_retry_in(&block) - self.sidekiq_retry_in_block = block - end - - def sidekiq_retries_exhausted(&block) - self.sidekiq_retries_exhausted_block = block - end - - def get_sidekiq_options # :nodoc: - self.sidekiq_options_hash ||= Sidekiq.default_worker_options - end - - def sidekiq_class_attribute(*attrs) - instance_reader = true - instance_writer = true - - attrs.each do |name| - synchronized_getter = "__synchronized_#{name}" - - singleton_class.instance_eval do - undef_method(name) if method_defined?(name) || private_method_defined?(name) - end - - define_singleton_method(synchronized_getter) { nil } - singleton_class.class_eval do - private(synchronized_getter) - end - - define_singleton_method(name) { ACCESSOR_MUTEX.synchronize { send synchronized_getter } } - - ivar = "@#{name}" - - singleton_class.instance_eval do - m = "#{name}=" - undef_method(m) if method_defined?(m) || private_method_defined?(m) - end - define_singleton_method("#{name}=") do |val| - singleton_class.class_eval do - ACCESSOR_MUTEX.synchronize do - undef_method(synchronized_getter) if method_defined?(synchronized_getter) || private_method_defined?(synchronized_getter) - define_method(synchronized_getter) { val } - end - end - - if singleton_class? - class_eval do - undef_method(name) if method_defined?(name) || private_method_defined?(name) - define_method(name) do - if instance_variable_defined? ivar - instance_variable_get ivar - else - singleton_class.send name - end - end - end - end - val - end - - if instance_reader - undef_method(name) if method_defined?(name) || private_method_defined?(name) - define_method(name) do - if instance_variable_defined?(ivar) - instance_variable_get ivar - else - self.class.public_send name - end - end - end - - if instance_writer - m = "#{name}=" - undef_method(m) if method_defined?(m) || private_method_defined?(m) - attr_writer name - end - end - end - end - end - - attr_accessor :jid - - def self.included(base) - raise ArgumentError, "Sidekiq::Worker cannot be included in an ActiveJob: #{base.name}" if base.ancestors.any? { |c| c.name == "ActiveJob::Base" } - - base.include(Options) - base.extend(ClassMethods) - end - - def logger - Sidekiq.logger - end - - # This helper class encapsulates the set options for `set`, e.g. - # - # SomeWorker.set(queue: 'foo').perform_async(....) - # - class Setter - def initialize(klass, opts) - @klass = klass - @opts = opts - end - - def set(options) - @opts.merge!(options) - self - end - - def perform_async(*args) - @klass.client_push(@opts.merge("args" => args, "class" => @klass)) - end - - # +interval+ must be a timestamp, numeric or something that acts - # numeric (like an activesupport time interval). - def perform_in(interval, *args) - int = interval.to_f - now = Time.now.to_f - ts = (int < 1_000_000_000 ? now + int : int) - - payload = @opts.merge("class" => @klass, "args" => args) - # Optimization to enqueue something now that is scheduled to go out now or in the past - payload["at"] = ts if ts > now - @klass.client_push(payload) - end - alias_method :perform_at, :perform_in - end - - module ClassMethods - def delay(*args) - raise ArgumentError, "Do not call .delay on a Sidekiq::Worker class, call .perform_async" - end - - def delay_for(*args) - raise ArgumentError, "Do not call .delay_for on a Sidekiq::Worker class, call .perform_in" - end - - def delay_until(*args) - raise ArgumentError, "Do not call .delay_until on a Sidekiq::Worker class, call .perform_at" - end - - def set(options) - Setter.new(self, options) - end - - def perform_async(*args) - client_push("class" => self, "args" => args) - end - - # +interval+ must be a timestamp, numeric or something that acts - # numeric (like an activesupport time interval). - def perform_in(interval, *args) - int = interval.to_f - now = Time.now.to_f - ts = (int < 1_000_000_000 ? now + int : int) - - item = {"class" => self, "args" => args} - - # Optimization to enqueue something now that is scheduled to go out now or in the past - item["at"] = ts if ts > now - - client_push(item) - end - alias_method :perform_at, :perform_in - - ## - # Allows customization for this type of Worker. - # Legal options: - # - # queue - use a named queue for this Worker, default 'default' - # retry - enable the RetryJobs middleware for this Worker, *true* to use the default - # or *Integer* count - # backtrace - whether to save any error backtrace in the retry payload to display in web UI, - # can be true, false or an integer number of lines to save, default *false* - # pool - use the given Redis connection pool to push this type of job to a given shard. - # - # In practice, any option is allowed. This is the main mechanism to configure the - # options for a specific job. - def sidekiq_options(opts = {}) - super - end - - def client_push(item) # :nodoc: - pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options["pool"] || Sidekiq.redis_pool - stringified_item = item.transform_keys(&:to_s) - - Sidekiq::Client.new(pool).push(stringified_item) - end - end - end + Worker = Job end diff --git a/test/test_api.rb b/test/test_api.rb index 2392a6f20..d92da85c1 100644 --- a/test/test_api.rb +++ b/test/test_api.rb @@ -308,7 +308,7 @@ class WorkerWithTags it "unwraps ActiveJob #{ver} jobs" do #ApiJob.perform_later(1,2,3) #puts Sidekiq::Queue.new.first.value - x = Sidekiq::Job.new(jobs[0], "default") + x = Sidekiq::JobRecord.new(jobs[0], "default") assert_equal ApiJob.name, x.display_class assert_equal [1,2,3], x.display_args end @@ -316,7 +316,7 @@ class WorkerWithTags it "unwraps ActionMailer #{ver} jobs" do #ApiMailer.test_email(1,2,3).deliver_later #puts Sidekiq::Queue.new("mailers").first.value - x = Sidekiq::Job.new(jobs[1], "mailers") + x = Sidekiq::JobRecord.new(jobs[1], "mailers") assert_equal "#{ApiMailer.name}#test_email", x.display_class assert_equal [1,2,3], x.display_args end diff --git a/test/test_client.rb b/test/test_client.rb index f208ece02..dcbb1d2af 100644 --- a/test/test_client.rb +++ b/test/test_client.rb @@ -100,17 +100,17 @@ def call(worker_klass,msg,q,r) end class MyWorker - include Sidekiq::Worker + include Sidekiq::Job end class QueuedWorker - include Sidekiq::Worker + include Sidekiq::Job sidekiq_options :queue => :flimflam end it 'enqueues' do Sidekiq.redis {|c| c.flushdb } - assert_equal Sidekiq.default_worker_options, MyWorker.get_sidekiq_options + assert_equal Sidekiq.default_job_options, MyWorker.get_sidekiq_options assert MyWorker.perform_async(1, 2) assert Sidekiq::Client.enqueue(MyWorker, 1, 2) assert Sidekiq::Client.enqueue_to(:custom_queue, MyWorker, 1, 2) @@ -181,7 +181,7 @@ class QueuedWorker end class BaseWorker - include Sidekiq::Worker + include Sidekiq::Job sidekiq_options 'retry' => 'base' end class AWorker < BaseWorker @@ -284,7 +284,7 @@ class DWorker < BaseWorker describe 'class attribute race conditions' do new_class = -> { Class.new do - class_eval('include Sidekiq::Worker') + class_eval('include Sidekiq::Job') define_method(:foo) { get_sidekiq_options } end diff --git a/test/test_rails.rb b/test/test_rails.rb index c5051ea2c..6d8af7e52 100644 --- a/test/test_rails.rb +++ b/test/test_rails.rb @@ -9,19 +9,19 @@ # need to force this since we aren't booting a Rails app ActiveJob::Base.queue_adapter = :sidekiq ActiveJob::Base.logger = nil - ActiveJob::Base.send(:include, ::Sidekiq::Worker::Options) unless ActiveJob::Base.respond_to?(:sidekiq_options) + ActiveJob::Base.send(:include, ::Sidekiq::Job::Options) unless ActiveJob::Base.respond_to?(:sidekiq_options) end - it 'does not allow Sidekiq::Worker in AJ::Base classes' do + it 'does not allow Sidekiq::Job in AJ::Base classes' do ex = assert_raises ArgumentError do Class.new(ActiveJob::Base) do - include Sidekiq::Worker + include Sidekiq::Job end end - assert_includes ex.message, "Sidekiq::Worker cannot be included" + assert_includes ex.message, "Sidekiq::Job cannot be included" end - it 'loads Sidekiq::Worker::Options in AJ::Base classes' do + it 'loads Sidekiq::Job::Options in AJ::Base classes' do aj = Class.new(ActiveJob::Base) do queue_as :bar sidekiq_options retry: 4, queue: 'foo', backtrace: 5 diff --git a/test/test_sidekiq.rb b/test/test_sidekiq.rb index 7a1c62565..f440d2bf4 100644 --- a/test/test_sidekiq.rb +++ b/test/test_sidekiq.rb @@ -48,14 +48,14 @@ end end - describe 'default_worker_options' do + describe 'default_job_options' do it 'stringifies keys' do - @old_options = Sidekiq.default_worker_options + @old_options = Sidekiq.default_job_options begin - Sidekiq.default_worker_options = { queue: 'cat'} - assert_equal 'cat', Sidekiq.default_worker_options['queue'] + Sidekiq.default_job_options = { queue: 'cat'} + assert_equal 'cat', Sidekiq.default_job_options['queue'] ensure - Sidekiq.default_worker_options = @old_options + Sidekiq.default_job_options = @old_options end end end diff --git a/test/test_testing.rb b/test/test_testing.rb index e43aa4291..180409d02 100644 --- a/test/test_testing.rb +++ b/test/test_testing.rb @@ -89,7 +89,7 @@ end class AttributeWorker - include Sidekiq::Worker + include Sidekiq::Job sidekiq_class_attribute :count self.count = 0 attr_accessor :foo diff --git a/test/test_testing_fake.rb b/test/test_testing_fake.rb index 36b402187..8348ef2e6 100644 --- a/test/test_testing_fake.rb +++ b/test/test_testing_fake.rb @@ -5,21 +5,21 @@ class PerformError < RuntimeError; end class DirectWorker - include Sidekiq::Worker + include Sidekiq::Job def perform(a, b) a + b end end class EnqueuedWorker - include Sidekiq::Worker + include Sidekiq::Job def perform(a, b) a + b end end class StoredWorker - include Sidekiq::Worker + include Sidekiq::Job def perform(error) raise PerformError if error end @@ -119,7 +119,7 @@ def foo(str) end class SpecificJidWorker - include Sidekiq::Worker + include Sidekiq::Job sidekiq_class_attribute :count self.count = 0 def perform(worker_jid) @@ -174,7 +174,7 @@ def perform(worker_jid) end class FirstWorker - include Sidekiq::Worker + include Sidekiq::Job sidekiq_class_attribute :count self.count = 0 def perform @@ -183,7 +183,7 @@ def perform end class SecondWorker - include Sidekiq::Worker + include Sidekiq::Job sidekiq_class_attribute :count self.count = 0 def perform @@ -192,7 +192,7 @@ def perform end class ThirdWorker - include Sidekiq::Worker + include Sidekiq::Job sidekiq_class_attribute :count def perform FirstWorker.perform_async @@ -297,14 +297,14 @@ def perform end class QueueWorker - include Sidekiq::Worker + include Sidekiq::Job def perform(a, b) a + b end end class AltQueueWorker - include Sidekiq::Worker + include Sidekiq::Job sidekiq_options queue: :alt def perform(a, b) a + b diff --git a/test/test_testing_inline.rb b/test/test_testing_inline.rb index f3cb4b118..1fb7ca8cc 100644 --- a/test/test_testing_inline.rb +++ b/test/test_testing_inline.rb @@ -6,7 +6,7 @@ class InlineError < RuntimeError; end class ParameterIsNotString < RuntimeError; end class InlineWorker - include Sidekiq::Worker + include Sidekiq::Job def perform(pass) raise ArgumentError, "no jid" unless jid raise InlineError unless pass @@ -14,7 +14,7 @@ def perform(pass) end class InlineWorkerWithTimeParam - include Sidekiq::Worker + include Sidekiq::Job def perform(time) raise ParameterIsNotString unless time.is_a?(String) || time.is_a?(Numeric) end diff --git a/test/test_web.rb b/test/test_web.rb index cb8b88beb..9cdb888e7 100644 --- a/test/test_web.rb +++ b/test/test_web.rb @@ -22,7 +22,7 @@ def job_params(job, score) end class WebWorker - include Sidekiq::Worker + include Sidekiq::Job def perform(a, b) a + b diff --git a/test/test_worker.rb b/test/test_worker.rb index 0a3542a2f..62f7fd11a 100644 --- a/test/test_worker.rb +++ b/test/test_worker.rb @@ -4,7 +4,7 @@ describe '#set' do class SetWorker - include Sidekiq::Worker + include Sidekiq::Job sidekiq_options :queue => :foo, 'retry' => 12 end diff --git a/test/test_worker_generator.rb b/test/test_worker_generator.rb index dbb60ad10..a79b8a2f7 100644 --- a/test/test_worker_generator.rb +++ b/test/test_worker_generator.rb @@ -11,17 +11,17 @@ class WorkerGeneratorTest < Rails::Generators::TestCase test 'all files are properly created' do run_generator ['foo'] - assert_file 'app/workers/foo_worker.rb' - assert_file 'test/workers/foo_worker_test.rb' + assert_file 'app/workers/foo_job.rb' + assert_file 'test/workers/foo_job_test.rb' end test 'gracefully handles extra worker suffix' do - run_generator ['foo_worker'] - assert_no_file 'app/workers/foo_worker_worker.rb' - assert_no_file 'test/workers/foo_worker_worker_test.rb' + run_generator ['foo_job'] + assert_no_file 'app/workers/foo_job_job.rb' + assert_no_file 'test/workers/foo_job_job_test.rb' - assert_file 'app/workers/foo_worker.rb' - assert_file 'test/workers/foo_worker_test.rb' + assert_file 'app/workers/foo_job.rb' + assert_file 'test/workers/foo_job_test.rb' end test 'respects rails config test_framework option' do @@ -31,8 +31,8 @@ class WorkerGeneratorTest < Rails::Generators::TestCase run_generator ['foo'] - assert_file 'app/workers/foo_worker.rb' - assert_no_file 'test/workers/foo_worker_test.rb' + assert_file 'app/workers/foo_job.rb' + assert_no_file 'test/workers/foo_job_test.rb' ensure Rails.application.config.generators do |g| g.test_framework :test_case @@ -46,8 +46,8 @@ class WorkerGeneratorTest < Rails::Generators::TestCase run_generator ['foo'] - assert_file 'app/workers/foo_worker.rb' - assert_file 'spec/workers/foo_worker_spec.rb' + assert_file 'app/workers/foo_job.rb' + assert_file 'spec/workers/foo_job_spec.rb' ensure Rails.application.config.generators do |g| g.test_framework :test_case diff --git a/web/views/busy.erb b/web/views/busy.erb index 1fac403fe..c4f309253 100644 --- a/web/views/busy.erb +++ b/web/views/busy.erb @@ -110,7 +110,7 @@ <%= t('Started') %> <% workers.each do |process, thread, msg| %> - <% job = Sidekiq::Job.new(msg['payload']) %> + <% job = Sidekiq::JobRecord.new(msg['payload']) %> <%= process %> <%= thread %>