Skip to content

Commit

Permalink
fix(digest): write digest on middleware call (#774)
Browse files Browse the repository at this point in the history
* fix(digest): write digest on middleware call

* chore: prefer Sidekiq::Job everywhree

* chore(lint): lint'em real good

* fix(ci): improve reliability of test on linux

* chore(ci): test against newer versions

* chore(lint): lint'em real good

* fix(unlock): make cleanup more aggressive

* fix: prevent time helper conflicts

Closes #790
  • Loading branch information
mhenrixon committed Feb 7, 2024
1 parent 7d5e40a commit 63e9431
Show file tree
Hide file tree
Showing 32 changed files with 193 additions and 162 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ jobs:
- uses: actions/checkout@v4
- uses: ruby/setup-ruby@v1
with:
ruby-version: 3.1
bundler: 2.4.12
ruby-version: 3.2
bundler-cache: true
- run: bin/bundle --jobs=$(nproc) --retry=$(nproc)
- run: bin/rubocop -P
6 changes: 3 additions & 3 deletions .github/workflows/rspec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ jobs:
- uses: ruby/setup-ruby@v1
with:
ruby-version: 3.2
bundler: 2.4.12
bundler-cache: true

- name: Install Code Climate reporter
Expand Down Expand Up @@ -59,16 +58,17 @@ jobs:
strategy:
fail-fast: true
matrix:
ruby: [2.7, '3.0', 3.1, 3.2]
ruby: ["2.7", '3.0', "3.1", "3.2", "3.3"]
gemfile:
- sidekiq_7.0
- sidekiq_7.1
- sidekiq_7.2

steps:
- uses: actions/checkout@v4
- uses: ruby/setup-ruby@v1
with:
ruby-version: ${{ matrix.ruby }}
bundler: 2.4.12
bundler-cache: true
- run: >-
REDIS_HOST=localhost
Expand Down
3 changes: 3 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ Layout/EndAlignment:
Layout/LineContinuationLeadingSpace:
Enabled: false

Layout/MultilineMethodCallIndentation:
EnforcedStyle: indented

Lint/AmbiguousBlockAssociation:
Exclude:
- spec/**/*
Expand Down
8 changes: 8 additions & 0 deletions Appraisals
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,11 @@
appraise "sidekiq-7.0" do
gem "sidekiq", "~> 7.0.0"
end

appraise "sidekiq-7.1" do
gem "sidekiq", "~> 7.1.0"
end

appraise "sidekiq-7.2" do
gem "sidekiq", "~> 7.2.0"
end
28 changes: 28 additions & 0 deletions gemfiles/sidekiq_7.1.gemfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# This file was generated by Appraisal

source "https://rubygems.org"

gem "appraisal"
gem "faraday-retry"
gem "gem-release"
gem "github-markup"
gem "rack-test"
gem "rake", "13.0.3"
gem "reek", ">= 5.3"
gem "rspec"
gem "rspec-benchmark"
gem "rspec-html-matchers"
gem "rspec-its"
gem "rubocop-mhenrixon"
gem "simplecov-sublime", ">= 0.21.2", require: false
gem "sinatra"
gem "timecop"
gem "toxiproxy"
gem "yard"
gem "sidekiq", "~> 7.0.0"

platforms :mri do
gem "concurrent-ruby-ext"
end

gemspec path: "../"
28 changes: 28 additions & 0 deletions gemfiles/sidekiq_7.2.gemfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# This file was generated by Appraisal

source "https://rubygems.org"

gem "appraisal"
gem "faraday-retry"
gem "gem-release"
gem "github-markup"
gem "rack-test"
gem "rake", "13.0.3"
gem "reek", ">= 5.3"
gem "rspec"
gem "rspec-benchmark"
gem "rspec-html-matchers"
gem "rspec-its"
gem "rubocop-mhenrixon"
gem "simplecov-sublime", ">= 0.21.2", require: false
gem "sinatra"
gem "timecop"
gem "toxiproxy"
gem "yard"
gem "sidekiq", "~> 7.0.0"

platforms :mri do
gem "concurrent-ruby-ext"
end

gemspec path: "../"
8 changes: 4 additions & 4 deletions lib/sidekiq_unique_jobs/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ class Config < ThreadSafeConfig
# @return [Hash<Symbol, SidekiqUniqueJobs::Lock::BaseLock] all available default locks
LOCKS =
LOCKS_WHEN_BUSY.dup
.merge(LOCKS_WHILE_ENQUEUED.dup)
.merge(LOCKS_WITHOUT_UNLOCK.dup)
.merge(LOCKS_FROM_PUSH_TO_PROCESSED.dup)
.freeze
.merge(LOCKS_WHILE_ENQUEUED.dup)
.merge(LOCKS_WITHOUT_UNLOCK.dup)
.merge(LOCKS_FROM_PUSH_TO_PROCESSED.dup)
.freeze

#
# @return [Hash<Symbol, SidekiqUniqueJobs::OnConflict::Strategy] all available default strategies
Expand Down
8 changes: 4 additions & 4 deletions lib/sidekiq_unique_jobs/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,19 @@ def add_lock_ttl(item)
end

def add_lock_timeout(item)
item[LOCK_TIMEOUT] ||= SidekiqUniqueJobs::LockTimeout.calculate(item)
item[LOCK_TIMEOUT] = SidekiqUniqueJobs::LockTimeout.calculate(item)
end

def add_lock_args(item)
item[LOCK_ARGS] ||= SidekiqUniqueJobs::LockArgs.call(item)
item[LOCK_ARGS] = SidekiqUniqueJobs::LockArgs.call(item)
end

def add_lock_digest(item)
item[LOCK_DIGEST] ||= SidekiqUniqueJobs::LockDigest.call(item)
item[LOCK_DIGEST] = SidekiqUniqueJobs::LockDigest.call(item)
end

def add_lock_prefix(item)
item[LOCK_PREFIX] ||= SidekiqUniqueJobs.config.lock_prefix
item[LOCK_PREFIX] = SidekiqUniqueJobs.config.lock_prefix
end

def add_lock_type(item)
Expand Down
4 changes: 2 additions & 2 deletions lib/sidekiq_unique_jobs/locksmith.rb
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,8 @@ def primed_async(conn, wait = nil, &block) # rubocop:disable Metrics/MethodLengt

# NOTE: When debugging, change .value to .value!
primed_jid = Concurrent::Promises
.future(conn) { |red_con| pop_queued(red_con, timeout) }
.value
.future(conn) { |red_con| pop_queued(red_con, timeout) }
.value

handle_primed(primed_jid, &block)
end
Expand Down
25 changes: 16 additions & 9 deletions lib/sidekiq_unique_jobs/lua/unlock.lua
Original file line number Diff line number Diff line change
Expand Up @@ -75,21 +75,28 @@ if lock_type ~= "until_expired" then
redis.call("HDEL", locked, job_id)
end

if redis.call("LLEN", primed) == 0 then
log_debug("UNLINK", primed)
redis.call("UNLINK", primed)
end

local locked_count = redis.call("HLEN", locked)

if locked_count and locked_count < 1 then
if locked_count < 1 then
log_debug("UNLINK", locked)
redis.call("UNLINK", locked)
end

if redis.call("LLEN", primed) == 0 then
log_debug("UNLINK", primed)
redis.call("UNLINK", primed)
end

if limit and limit <= 1 and locked_count and locked_count <= 1 then
log_debug("ZREM", digests, digest)
redis.call("ZREM", digests, digest)
if limit then
if limit <= 1 and locked_count <= 1 then
log_debug("ZREM", digests, digest)
redis.call("ZREM", digests, digest)
end
else
if locked_count <= 1 then
log_debug("ZREM", digests, digest)
redis.call("ZREM", digests, digest)
end
end

log_debug("LPUSH", queued, "1")
Expand Down
10 changes: 5 additions & 5 deletions lib/sidekiq_unique_jobs/middleware.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,23 @@ module Middleware
# This method runs before (prepended) the actual middleware implementation.
# This is done to reduce duplication
#
# @param [Sidekiq::Worker] worker_class
# @param [Sidekiq::Job] worker_class
# @param [Hash] item a sidekiq job hash
# @param [String] queue name of the queue
# @param [ConnectionPool] redis_pool only used for compatility reasons
#
# @return [yield<super>] <description>
# @return [yield<super>] call the rest of the middleware stack
#
# @yieldparam [<type>] if <description>
# @yieldreturn [<type>] <describe what yield should return>
# @yieldparam [void] if uniquejobs is disable
# @yieldreturn [void] delegate back to other sidekiq middleware
def call(worker_class, item, queue, redis_pool = nil)
@item = item
@queue = queue
@redis_pool = redis_pool
self.job_class = worker_class
return yield if unique_disabled?

SidekiqUniqueJobs::Job.prepare(item) unless item[LOCK_DIGEST]
SidekiqUniqueJobs::Job.prepare(item)

with_logging_context do
super
Expand Down
2 changes: 1 addition & 1 deletion lib/sidekiq_unique_jobs/middleware/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ module Middleware
#
# @author Mikael Henriksson <mikael@mhenrixon.com>
class Client
include Sidekiq::ClientMiddleware if defined?(Sidekiq::ClientMiddleware)
include Sidekiq::ClientMiddleware

# prepend "SidekiqUniqueJobs::Middleware"
# @!parse prepends SidekiqUniqueJobs::Middleware
Expand Down
2 changes: 1 addition & 1 deletion lib/sidekiq_unique_jobs/middleware/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ module Middleware
#
# @author Mikael Henriksson <mikael@mhenrixon.com>
class Server
include Sidekiq::ServerMiddleware if defined?(Sidekiq::ServerMiddleware)
include Sidekiq::ServerMiddleware

# prepend "SidekiqUniqueJobs::Middleware"
# @!parse prepends SidekiqUniqueJobs::Middleware
Expand Down
48 changes: 13 additions & 35 deletions lib/sidekiq_unique_jobs/sidekiq_unique_ext.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,46 +68,24 @@ def delete(score, job_id)
prepend UniqueExtension
end

if Sidekiq.const_defined?(:JobRecord)
# See Sidekiq::Api
class JobRecord
#
# Provides extensions for unlocking jobs that are removed and deleted
# See Sidekiq::Api
class JobRecord
#
# Provides extensions for unlocking jobs that are removed and deleted
#
# @author Mikael Henriksson <mikael@mhenrixon.com>
#
module UniqueExtension
#
# @author Mikael Henriksson <mikael@mhenrixon.com>
# Wraps the original method to ensure locks for the job are deleted
#
module UniqueExtension
#
# Wraps the original method to ensure locks for the job are deleted
#
def delete
SidekiqUniqueJobs::Unlockable.delete!(item)
super
end
def delete
SidekiqUniqueJobs::Unlockable.delete!(item)
super
end

prepend UniqueExtension
end
else
# See Sidekiq::Api
class Job
#
# Provides extensions for unlocking jobs that are removed and deleted
#
# @author Mikael Henriksson <mikael@mhenrixon.com>
#
module UniqueExtension
#
# Wraps the original method to ensure locks for the job are deleted
#
def delete
SidekiqUniqueJobs::Unlockable.delete!(item)
super
end
end

prepend UniqueExtension
end
prepend UniqueExtension
end

# See Sidekiq::Api
Expand Down
6 changes: 3 additions & 3 deletions lib/sidekiq_unique_jobs/sidekiq_unique_jobs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,9 @@ def validate_worker!(options)
# Attempt to constantize a string worker_class argument, always
# failing back to the original argument when the constant can't be found
#
# @return [Sidekiq::Worker]
# @return [Sidekiq::Job]
def constantize(str)
return str.class if str.is_a?(Sidekiq::Worker) # sidekiq v6.x
return str.class if str.is_a?(Sidekiq::Job) # sidekiq v6.x
return str unless str.is_a?(String)
return Object.const_get(str) unless str.include?("::")

Expand All @@ -269,7 +269,7 @@ def constantize(str)
# Attempt to constantize a string worker_class argument, always
# failing back to the original argument when the constant can't be found
#
# @return [Sidekiq::Worker, String]
# @return [Sidekiq::Job, String]
def safe_constantize(str)
constantize(str)
rescue NameError => ex
Expand Down
6 changes: 3 additions & 3 deletions lib/sidekiq_unique_jobs/sidekiq_worker_methods.rb
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
# frozen_string_literal: true

module SidekiqUniqueJobs
# Module with convenience methods for the Sidekiq::Worker class
# Module with convenience methods for the Sidekiq::Job class
#
# @author Mikael Henriksson <mikael@mhenrixon.com>
module SidekiqWorkerMethods
#
# @!attribute [r] job_class
# @return [Sidekiq::Worker] The Sidekiq::Worker implementation
# @return [Sidekiq::Job] The Sidekiq::Job implementation
attr_reader :job_class

# Avoids duplicating worker_class.respond_to? in multiple places
Expand Down Expand Up @@ -62,7 +62,7 @@ def after_unlock_hook # rubocop:disable Metrics/MethodLength
# Attempt to constantize a string worker_class argument, always
# failing back to the original argument when the constant can't be found
#
# @return [Sidekiq::Worker]
# @return [Sidekiq::Job]
def job_class_constantize(klazz = @job_class)
SidekiqUniqueJobs.safe_constantize(klazz)
end
Expand Down
8 changes: 4 additions & 4 deletions lib/sidekiq_unique_jobs/testing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ def self.use_options(tmp_config = {}) # rubocop:disable Metrics/MethodLength
end

#
# See Sidekiq::Worker in Sidekiq gem for more details
# See Sidekiq::Job in Sidekiq gem for more details
#
module Worker
#
# Adds class methods to Sidekiq::Worker
# Adds class methods to Sidekiq::Job
#
module ClassMethods
#
Expand Down Expand Up @@ -110,14 +110,14 @@ def clear
prepend Overrides

#
# Prepends methods to Sidekiq::Worker
# Prepends methods to Sidekiq::Job
#
module ClassMethods
prepend Overrides::ClassMethods
end

#
# Prepends singleton methods to Sidekiq::Worker
# Prepends singleton methods to Sidekiq::Job
#
module SignletonOverrides
#
Expand Down

0 comments on commit 63e9431

Please sign in to comment.