Skip to content

Commit

Permalink
Merge pull request #1227 from supercaracal/add-sharded-pubsub-support
Browse files Browse the repository at this point in the history
Add sharded Pub/Sub support for cluster
  • Loading branch information
byroot committed Oct 9, 2023
2 parents ccdf15f + 54e6a7e commit 230a5c4
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 1 deletion.
35 changes: 35 additions & 0 deletions cluster/test/commands_on_pub_sub_test.rb
Expand Up @@ -70,4 +70,39 @@ def test_publish_psubscribe_punsubscribe_pubsub
assert_equal('two', messages['gucci2'])
assert_equal('three', messages['hermes3'])
end

def test_spublish_ssubscribe_sunsubscribe_pubsub
omit_version('7.0.0')

sub_cnt = 0
messages = {}

thread = Thread.new do
redis.ssubscribe('channel1', 'channel2') do |on|
on.ssubscribe { sub_cnt += 1 }
on.smessage do |c, msg|
messages[c] = msg
redis.sunsubscribe if messages.size == 2
end
end
end

Thread.pass until sub_cnt == 2

publisher = build_another_client

assert_equal %w[channel1 channel2], publisher.pubsub(:shardchannels, 'channel*')
assert_equal({ 'channel1' => 1, 'channel2' => 1, 'channel3' => 0 },
publisher.pubsub(:shardnumsub, 'channel1', 'channel2', 'channel3'))

publisher.spublish('channel1', 'one')
publisher.spublish('channel2', 'two')
publisher.spublish('channel3', 'three')

thread.join

assert_equal(2, messages.size)
assert_equal('one', messages['channel1'])
assert_equal('two', messages['channel2'])
end
end
21 changes: 21 additions & 0 deletions lib/redis/commands/pubsub.rb
Expand Up @@ -49,6 +49,27 @@ def punsubscribe(*channels)
def pubsub(subcommand, *args)
send_command([:pubsub, subcommand] + args)
end

# Post a message to a channel in a shard.
def spublish(channel, message)
send_command([:spublish, channel, message])
end

# Listen for messages published to the given channels in a shard.
def ssubscribe(*channels, &block)
_subscription(:ssubscribe, 0, channels, block)
end

# Listen for messages published to the given channels in a shard.
# Throw a timeout error if there is no messages for a timeout period.
def ssubscribe_with_timeout(timeout, *channels, &block)
_subscription(:ssubscribe_with_timeout, timeout, channels, block)
end

# Stop listening for messages posted to the given channels in a shard.
def sunsubscribe(*channels)
_subscription(:sunsubscribe, 0, channels, nil)
end
end
end
end
30 changes: 29 additions & 1 deletion lib/redis/subscribe.rb
Expand Up @@ -29,6 +29,14 @@ def psubscribe_with_timeout(timeout, *channels, &block)
subscription("psubscribe", "punsubscribe", channels, block, timeout)
end

def ssubscribe(*channels, &block)
subscription("ssubscribe", "sunsubscribe", channels, block)
end

def ssubscribe_with_timeout(timeout, *channels, &block)
subscription("ssubscribe", "sunsubscribe", channels, block, timeout)
end

def unsubscribe(*channels)
call_v([:unsubscribe, *channels])
end
Expand All @@ -37,6 +45,10 @@ def punsubscribe(*channels)
call_v([:punsubscribe, *channels])
end

def sunsubscribe(*channels)
call_v([:sunsubscribe, *channels])
end

def close
@client.close
end
Expand All @@ -46,7 +58,11 @@ def close
def subscription(start, stop, channels, block, timeout = 0)
sub = Subscription.new(&block)

call_v([start, *channels])
case start
when "ssubscribe" then channels.each { |c| call_v([start, c]) } # avoid cross-slot keys
else call_v([start, *channels])
end

while event = @client.next_event(timeout)
if event.is_a?(::RedisClient::CommandError)
raise Client::ERROR_MAPPING.fetch(event.class), event.message
Expand Down Expand Up @@ -94,5 +110,17 @@ def punsubscribe(&block)
def pmessage(&block)
@callbacks["pmessage"] = block
end

def ssubscribe(&block)
@callbacks["ssubscribe"] = block
end

def sunsubscribe(&block)
@callbacks["sunsubscribe"] = block
end

def smessage(&block)
@callbacks["smessage"] = block
end
end
end

0 comments on commit 230a5c4

Please sign in to comment.