From 62045af3d50087c288ed7a9feab9dc3e8fc0bdcc Mon Sep 17 00:00:00 2001 From: Ryan LeCompte Date: Tue, 7 Feb 2012 03:29:09 -0800 Subject: [PATCH] add unique payloads support, addresses issue #4 --- lib/sidekiq/client.rb | 20 +++++++++++++++++++- lib/sidekiq/manager.rb | 8 ++++++-- test/test_client.rb | 22 ++++++++++++++++++++++ test/test_manager.rb | 2 +- 4 files changed, 48 insertions(+), 4 deletions(-) diff --git a/lib/sidekiq/client.rb b/lib/sidekiq/client.rb index 372afcded..01c519da8 100644 --- a/lib/sidekiq/client.rb +++ b/lib/sidekiq/client.rb @@ -1,9 +1,14 @@ require 'multi_json' require 'redis' +require 'base64' module Sidekiq class Client + class << self + attr_accessor :push_unique_only + end + def self.redis @redis ||= begin # autoconfig for Heroku @@ -24,7 +29,16 @@ def self.push(queue='default', item) raise(ArgumentError, "Message must include a class and set of arguments: #{item.inspect}") if !item['class'] || !item['args'] item['class'] = item['class'].to_s if !item['class'].is_a?(String) - redis.rpush("queue:#{queue}", MultiJson.encode(item)) + queue_key = "queue:#{queue}" + encoded_payloads_key = "queue:encoded:#{queue}" + payload = MultiJson.encode(item) + encoded_payload = Base64.encode64(payload) + return if push_unique_only && already_queued?(encoded_payloads_key, encoded_payload) + + redis.multi do + redis.sadd(encoded_payloads_key, encoded_payload) + redis.rpush(queue_key, payload) + end end # Please use .push if possible instead. @@ -44,5 +58,9 @@ def self.enqueue(klass, *args) queue = (klass.respond_to?(:queue) && klass.queue) || 'default' push(queue, { 'class' => klass.name, 'args' => args }) end + + def self.already_queued?(queue_key, encoded_payload) + redis.sismember(queue_key, encoded_payload) + end end end diff --git a/lib/sidekiq/manager.rb b/lib/sidekiq/manager.rb index 8be2f428c..12f60d928 100644 --- a/lib/sidekiq/manager.rb +++ b/lib/sidekiq/manager.rb @@ -83,11 +83,15 @@ def processor_died(processor, reason) def find_work(queue_idx) current_queue = @queues[queue_idx] - msg = @redis.lpop("queue:#{current_queue}") + queue_key = "queue:#{current_queue}" + encoded_payloads_key = "queue:encoded:#{current_queue}" + msg = @redis.lpop(queue_key) if msg + payload = MultiJson.decode(msg) + @redis.srem(encoded_payloads_key, Base64.encode64(msg)) processor = @ready.pop @busy << processor - processor.process! MultiJson.decode(msg) + processor.process!(payload) end !!msg end diff --git a/test/test_client.rb b/test/test_client.rb index d36d7f308..5add7742e 100644 --- a/test/test_client.rb +++ b/test/test_client.rb @@ -3,10 +3,32 @@ require 'sidekiq/worker' class TestClient < MiniTest::Unit::TestCase + describe 'with real redis' do + before do + Sidekiq::Client.redis = Redis.connect(:url => 'redis://localhost/sidekiq_test') + Sidekiq::Client.redis.flushdb + end + + it 'does not push duplicate messages when configured for unique only' do + Sidekiq::Client.push_unique_only = true + 10.times { Sidekiq::Client.push('customqueue', 'class' => 'Foo', 'args' => [1, 2]) } + assert_equal Sidekiq::Client.redis.llen("queue:customqueue"), 1 + end + + it 'does push duplicate messages when not configured for unique only' do + Sidekiq::Client.push_unique_only = false + 10.times { Sidekiq::Client.push('customqueue2', 'class' => 'Foo', 'args' => [1, 2]) } + assert_equal Sidekiq::Client.redis.llen("queue:customqueue2"), 10 + end + end + describe 'with mock redis' do before do @redis = MiniTest::Mock.new + def @redis.multi; yield; end + def @redis.sadd(*); true; end Sidekiq::Client.redis = @redis + Sidekiq::Client.push_unique_only = false end it 'raises ArgumentError with invalid params' do diff --git a/test/test_manager.rb b/test/test_manager.rb index 816963ac5..bc060a79a 100644 --- a/test/test_manager.rb +++ b/test/test_manager.rb @@ -30,7 +30,7 @@ def perform(a, b) q << 'done' if $processed == 2 end mgr.start! - result = q.timed_pop + result = q.timed_pop(1.0) assert_equal 'done', result mgr.stop end