Skip to content

Commit

Permalink
add unique payloads support, addresses issue #4
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanlecompte committed Feb 7, 2012
1 parent 2d42f3d commit 62045af
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 4 deletions.
20 changes: 19 additions & 1 deletion lib/sidekiq/client.rb
@@ -1,9 +1,14 @@
require 'multi_json'
require 'redis'
require 'base64'

module Sidekiq
class Client

class << self
attr_accessor :push_unique_only
end

def self.redis
@redis ||= begin
# autoconfig for Heroku
Expand All @@ -24,7 +29,16 @@ def self.push(queue='default', item)
raise(ArgumentError, "Message must include a class and set of arguments: #{item.inspect}") if !item['class'] || !item['args']

item['class'] = item['class'].to_s if !item['class'].is_a?(String)
redis.rpush("queue:#{queue}", MultiJson.encode(item))
queue_key = "queue:#{queue}"
encoded_payloads_key = "queue:encoded:#{queue}"
payload = MultiJson.encode(item)
encoded_payload = Base64.encode64(payload)
return if push_unique_only && already_queued?(encoded_payloads_key, encoded_payload)

redis.multi do
redis.sadd(encoded_payloads_key, encoded_payload)
redis.rpush(queue_key, payload)
end
end

# Please use .push if possible instead.
Expand All @@ -44,5 +58,9 @@ def self.enqueue(klass, *args)
queue = (klass.respond_to?(:queue) && klass.queue) || 'default'
push(queue, { 'class' => klass.name, 'args' => args })
end

def self.already_queued?(queue_key, encoded_payload)
redis.sismember(queue_key, encoded_payload)
end
end
end
8 changes: 6 additions & 2 deletions lib/sidekiq/manager.rb
Expand Up @@ -83,11 +83,15 @@ def processor_died(processor, reason)

def find_work(queue_idx)
current_queue = @queues[queue_idx]
msg = @redis.lpop("queue:#{current_queue}")
queue_key = "queue:#{current_queue}"
encoded_payloads_key = "queue:encoded:#{current_queue}"
msg = @redis.lpop(queue_key)
if msg
payload = MultiJson.decode(msg)
@redis.srem(encoded_payloads_key, Base64.encode64(msg))
processor = @ready.pop
@busy << processor
processor.process! MultiJson.decode(msg)
processor.process!(payload)
end
!!msg
end
Expand Down
22 changes: 22 additions & 0 deletions test/test_client.rb
Expand Up @@ -3,10 +3,32 @@
require 'sidekiq/worker'

class TestClient < MiniTest::Unit::TestCase
describe 'with real redis' do
before do
Sidekiq::Client.redis = Redis.connect(:url => 'redis://localhost/sidekiq_test')
Sidekiq::Client.redis.flushdb
end

it 'does not push duplicate messages when configured for unique only' do
Sidekiq::Client.push_unique_only = true
10.times { Sidekiq::Client.push('customqueue', 'class' => 'Foo', 'args' => [1, 2]) }
assert_equal Sidekiq::Client.redis.llen("queue:customqueue"), 1
end

it 'does push duplicate messages when not configured for unique only' do
Sidekiq::Client.push_unique_only = false
10.times { Sidekiq::Client.push('customqueue2', 'class' => 'Foo', 'args' => [1, 2]) }
assert_equal Sidekiq::Client.redis.llen("queue:customqueue2"), 10
end
end

describe 'with mock redis' do
before do
@redis = MiniTest::Mock.new
def @redis.multi; yield; end
def @redis.sadd(*); true; end
Sidekiq::Client.redis = @redis
Sidekiq::Client.push_unique_only = false
end

it 'raises ArgumentError with invalid params' do
Expand Down
2 changes: 1 addition & 1 deletion test/test_manager.rb
Expand Up @@ -30,7 +30,7 @@ def perform(a, b)
q << 'done' if $processed == 2
end
mgr.start!
result = q.timed_pop
result = q.timed_pop(1.0)
assert_equal 'done', result
mgr.stop
end
Expand Down

0 comments on commit 62045af

Please sign in to comment.