Skip to content

Commit

Permalink
Fix linting (#343)
Browse files Browse the repository at this point in the history
  • Loading branch information
tgxworld committed Aug 14, 2023
1 parent 876bcea commit 67163f5
Showing 1 changed file with 63 additions and 77 deletions.
140 changes: 63 additions & 77 deletions lib/message_bus/backends/redis.rb
@@ -1,7 +1,7 @@
# frozen_string_literal: true

require 'redis'
require 'digest'
require "redis"
require "digest"

module MessageBus
module Backends
Expand Down Expand Up @@ -49,9 +49,7 @@ def initialize(redis_config = {}, max_backlog_size = 1000)
@redis_config = redis_config.dup
@clear_every = redis_config.delete(:clear_every) || 1
@logger = @redis_config[:logger]
unless @redis_config[:enable_redis_logger]
@redis_config[:logger] = nil
end
@redis_config[:logger] = nil unless @redis_config[:enable_redis_logger]
@max_backlog_size = max_backlog_size
@max_global_backlog_size = 2000
@max_in_memory_publish_backlog = 1000
Expand All @@ -61,7 +59,7 @@ def initialize(redis_config = {}, max_backlog_size = 1000)
@pub_redis = nil
@subscribed = false
# after 7 days inactive backlogs will be removed
@max_backlog_age = 604800
@max_backlog_age = 604_800
end

# Reconnects to Redis; used after a process fork, typically triggered by a forking webserver
Expand All @@ -72,9 +70,7 @@ def after_fork

# (see Base#reset!)
def reset!
pub_redis.keys("__mb_*").each do |k|
pub_redis.del k
end
pub_redis.keys("__mb_*").each { |k| pub_redis.del k }
end

# (see Base#destroy)
Expand All @@ -85,9 +81,7 @@ def destroy
# Deletes all backlogs and their data. Does not delete ID pointers, so new publications will get IDs that continue from the last publication before the expiry. Use with extreme caution.
# @see Base#expire_all_backlogs!
def expire_all_backlogs!
pub_redis.keys("__mb_*backlog_n").each do |k|
pub_redis.del k
end
pub_redis.keys("__mb_*backlog_n").each { |k| pub_redis.del k }
end

# Note, the script takes care of all expiry of keys, however
Expand Down Expand Up @@ -157,23 +151,25 @@ def publish(channel, data, opts = nil)
max_backlog_size,
max_global_backlog_size,
channel,
clear_every
clear_every,
],
keys: [
global_id_key,
backlog_id_key,
backlog_key,
global_backlog_key,
redis_channel_name
]
redis_channel_name,
],
)
rescue ::Redis::CommandError => e
if queue_in_memory && e.message =~ /READONLY/
@lock.synchronize do
@in_memory_backlog << [channel, data]
if @in_memory_backlog.length > @max_in_memory_publish_backlog
@in_memory_backlog.delete_at(0)
@logger.warn("Dropping old message cause max_in_memory_publish_backlog is full: #{e.message}\n#{e.backtrace.join('\n')}")
@logger.warn(
"Dropping old message cause max_in_memory_publish_backlog is full: #{e.message}\n#{e.backtrace.join('\n')}",
)
end
end

Expand Down Expand Up @@ -209,9 +205,7 @@ def backlog(channel, last_id = 0)
backlog_key = backlog_key(channel)
items = redis.zrangebyscore backlog_key, last_id.to_i + 1, "+inf"

items.map do |i|
MessageBus::Message.decode(i)
end
items.map { |i| MessageBus::Message.decode(i) }
end

# (see Base#global_backlog)
Expand Down Expand Up @@ -254,13 +248,9 @@ def subscribe(channel, last_id = nil)
# we are subscribing on global and global is always going to be bigger than local
# so worst case is a replay of a few messages
message = get_message(channel, last_id)
if message
last_id = message.global_id
end
end
global_subscribe(last_id) do |m|
yield m if m.channel == channel
last_id = message.global_id if message
end
global_subscribe(last_id) { |m| yield m if m.channel == channel }
end

# (see Base#global_unsubscribe)
Expand All @@ -280,36 +270,31 @@ def global_subscribe(last_id = nil, &blk)

highest_id = last_id

clear_backlog = lambda do
retries = 4
begin
highest_id = process_global_backlog(highest_id, retries > 0, &blk)
rescue BackLogOutOfOrder => e
highest_id = e.highest_id
retries -= 1
sleep(rand(50) / 1000.0)
retry
clear_backlog =
lambda do
retries = 4
begin
highest_id = process_global_backlog(highest_id, retries > 0, &blk)
rescue BackLogOutOfOrder => e
highest_id = e.highest_id
retries -= 1
sleep(rand(50) / 1000.0)
retry
end
end
end

begin
global_redis = new_redis_connection

if highest_id
clear_backlog.call(&blk)
end
clear_backlog.call(&blk) if highest_id

global_redis.subscribe(redis_channel_name) do |on|
on.subscribe do
if highest_id
clear_backlog.call(&blk)
end
clear_backlog.call(&blk) if highest_id
@subscribed = true
end

on.unsubscribe do
@subscribed = false
end
on.unsubscribe { @subscribed = false }

on.message do |_c, m|
if m == UNSUB_MESSAGE
Expand Down Expand Up @@ -346,29 +331,30 @@ def global_subscribe(last_id = nil, &blk)
private

def new_redis_connection
config = @redis_config.filter do |k, v|
# This is not ideal, required for Redis gem version 5
# redis-client no longer accepts arbitrary params
# anything unknown will error out.
# https://github.com/redis-rb/redis-client/blob/4c8e05acfb3477c1651138a4924616e79e6116f2/lib/redis_client/config.rb#L21-L39
#
#
# We should be doing the opposite and allowlisting params
# or splitting the object up. Starting with the smallest change that is backwards compatible
![
:backend,
:logger,
:long_polling_enabled,
:long_polling_interval,
:backend_options,
:base_route,
:client_message_filters,
:site_id_lookup,
:group_ids_lookup,
:user_id_lookup,
:transport_codec
].include?(k)
end
config =
@redis_config.filter do |k, v|
# This is not ideal, required for Redis gem version 5
# redis-client no longer accepts arbitrary params
# anything unknown will error out.
# https://github.com/redis-rb/redis-client/blob/4c8e05acfb3477c1651138a4924616e79e6116f2/lib/redis_client/config.rb#L21-L39
#
#
# We should be doing the opposite and allowlisting params
# or splitting the object up. Starting with the smallest change that is backwards compatible
!%i[
backend
logger
long_polling_enabled
long_polling_interval
backend_options
base_route
client_message_filters
site_id_lookup
group_ids_lookup
user_id_lookup
transport_codec
].include?(k)
end
::Redis.new(config)
end

Expand Down Expand Up @@ -399,9 +385,7 @@ def global_backlog_key
end

def process_global_backlog(highest_id, raise_error)
if highest_id > pub_redis.get(global_id_key).to_i
highest_id = 0
end
highest_id = 0 if highest_id > pub_redis.get(global_id_key).to_i

global_backlog(highest_id).each do |old|
if highest_id + 1 == old.global_id
Expand Down Expand Up @@ -444,19 +428,21 @@ def ensure_backlog_flushed
if e.message =~ /^READONLY/
try_again = true
else
@logger.warn("Dropping undeliverable message: #{e.message}\n#{e.backtrace.join('\n')}")
@logger.warn(
"Dropping undeliverable message: #{e.message}\n#{e.backtrace.join('\n')}",
)
end
rescue => e
@logger.warn("Dropping undeliverable message: #{e.message}\n#{e.backtrace.join('\n')}")
@logger.warn(
"Dropping undeliverable message: #{e.message}\n#{e.backtrace.join('\n')}",
)
end

@in_memory_backlog.delete_at(0) unless try_again
end
end
ensure
@lock.synchronize do
@flush_backlog_thread = nil
end
@lock.synchronize { @flush_backlog_thread = nil }
end

def cached_eval(redis, script, script_sha1, params)
Expand All @@ -479,10 +465,10 @@ def is_readonly?
# in case we are not connected to the correct server
# which can happen when sharing ips
pub_redis.disconnect!
pub_redis.set(key, '1')
pub_redis.set(key, "1")
false
rescue ::Redis::CommandError => e
return true if e.message =~ /^READONLY/
true if e.message =~ /^READONLY/
end
end

Expand Down

0 comments on commit 67163f5

Please sign in to comment.