From 67163f5de7a453078db02a441a6285a901d0e7dd Mon Sep 17 00:00:00 2001 From: Alan Guo Xiang Tan Date: Mon, 14 Aug 2023 08:17:11 +0800 Subject: [PATCH] Fix linting (#343) --- lib/message_bus/backends/redis.rb | 140 ++++++++++++++---------------- 1 file changed, 63 insertions(+), 77 deletions(-) diff --git a/lib/message_bus/backends/redis.rb b/lib/message_bus/backends/redis.rb index d4b40308..bce6c14c 100644 --- a/lib/message_bus/backends/redis.rb +++ b/lib/message_bus/backends/redis.rb @@ -1,7 +1,7 @@ # frozen_string_literal: true -require 'redis' -require 'digest' +require "redis" +require "digest" module MessageBus module Backends @@ -49,9 +49,7 @@ def initialize(redis_config = {}, max_backlog_size = 1000) @redis_config = redis_config.dup @clear_every = redis_config.delete(:clear_every) || 1 @logger = @redis_config[:logger] - unless @redis_config[:enable_redis_logger] - @redis_config[:logger] = nil - end + @redis_config[:logger] = nil unless @redis_config[:enable_redis_logger] @max_backlog_size = max_backlog_size @max_global_backlog_size = 2000 @max_in_memory_publish_backlog = 1000 @@ -61,7 +59,7 @@ def initialize(redis_config = {}, max_backlog_size = 1000) @pub_redis = nil @subscribed = false # after 7 days inactive backlogs will be removed - @max_backlog_age = 604800 + @max_backlog_age = 604_800 end # Reconnects to Redis; used after a process fork, typically triggered by a forking webserver @@ -72,9 +70,7 @@ def after_fork # (see Base#reset!) def reset! - pub_redis.keys("__mb_*").each do |k| - pub_redis.del k - end + pub_redis.keys("__mb_*").each { |k| pub_redis.del k } end # (see Base#destroy) @@ -85,9 +81,7 @@ def destroy # Deletes all backlogs and their data. Does not delete ID pointers, so new publications will get IDs that continue from the last publication before the expiry. Use with extreme caution. # @see Base#expire_all_backlogs! def expire_all_backlogs! - pub_redis.keys("__mb_*backlog_n").each do |k| - pub_redis.del k - end + pub_redis.keys("__mb_*backlog_n").each { |k| pub_redis.del k } end # Note, the script takes care of all expiry of keys, however @@ -157,15 +151,15 @@ def publish(channel, data, opts = nil) max_backlog_size, max_global_backlog_size, channel, - clear_every + clear_every, ], keys: [ global_id_key, backlog_id_key, backlog_key, global_backlog_key, - redis_channel_name - ] + redis_channel_name, + ], ) rescue ::Redis::CommandError => e if queue_in_memory && e.message =~ /READONLY/ @@ -173,7 +167,9 @@ def publish(channel, data, opts = nil) @in_memory_backlog << [channel, data] if @in_memory_backlog.length > @max_in_memory_publish_backlog @in_memory_backlog.delete_at(0) - @logger.warn("Dropping old message cause max_in_memory_publish_backlog is full: #{e.message}\n#{e.backtrace.join('\n')}") + @logger.warn( + "Dropping old message cause max_in_memory_publish_backlog is full: #{e.message}\n#{e.backtrace.join('\n')}", + ) end end @@ -209,9 +205,7 @@ def backlog(channel, last_id = 0) backlog_key = backlog_key(channel) items = redis.zrangebyscore backlog_key, last_id.to_i + 1, "+inf" - items.map do |i| - MessageBus::Message.decode(i) - end + items.map { |i| MessageBus::Message.decode(i) } end # (see Base#global_backlog) @@ -254,13 +248,9 @@ def subscribe(channel, last_id = nil) # we are subscribing on global and global is always going to be bigger than local # so worst case is a replay of a few messages message = get_message(channel, last_id) - if message - last_id = message.global_id - end - end - global_subscribe(last_id) do |m| - yield m if m.channel == channel + last_id = message.global_id if message end + global_subscribe(last_id) { |m| yield m if m.channel == channel } end # (see Base#global_unsubscribe) @@ -280,36 +270,31 @@ def global_subscribe(last_id = nil, &blk) highest_id = last_id - clear_backlog = lambda do - retries = 4 - begin - highest_id = process_global_backlog(highest_id, retries > 0, &blk) - rescue BackLogOutOfOrder => e - highest_id = e.highest_id - retries -= 1 - sleep(rand(50) / 1000.0) - retry + clear_backlog = + lambda do + retries = 4 + begin + highest_id = process_global_backlog(highest_id, retries > 0, &blk) + rescue BackLogOutOfOrder => e + highest_id = e.highest_id + retries -= 1 + sleep(rand(50) / 1000.0) + retry + end end - end begin global_redis = new_redis_connection - if highest_id - clear_backlog.call(&blk) - end + clear_backlog.call(&blk) if highest_id global_redis.subscribe(redis_channel_name) do |on| on.subscribe do - if highest_id - clear_backlog.call(&blk) - end + clear_backlog.call(&blk) if highest_id @subscribed = true end - on.unsubscribe do - @subscribed = false - end + on.unsubscribe { @subscribed = false } on.message do |_c, m| if m == UNSUB_MESSAGE @@ -346,29 +331,30 @@ def global_subscribe(last_id = nil, &blk) private def new_redis_connection - config = @redis_config.filter do |k, v| - # This is not ideal, required for Redis gem version 5 - # redis-client no longer accepts arbitrary params - # anything unknown will error out. - # https://github.com/redis-rb/redis-client/blob/4c8e05acfb3477c1651138a4924616e79e6116f2/lib/redis_client/config.rb#L21-L39 - # - # - # We should be doing the opposite and allowlisting params - # or splitting the object up. Starting with the smallest change that is backwards compatible - ![ - :backend, - :logger, - :long_polling_enabled, - :long_polling_interval, - :backend_options, - :base_route, - :client_message_filters, - :site_id_lookup, - :group_ids_lookup, - :user_id_lookup, - :transport_codec - ].include?(k) - end + config = + @redis_config.filter do |k, v| + # This is not ideal, required for Redis gem version 5 + # redis-client no longer accepts arbitrary params + # anything unknown will error out. + # https://github.com/redis-rb/redis-client/blob/4c8e05acfb3477c1651138a4924616e79e6116f2/lib/redis_client/config.rb#L21-L39 + # + # + # We should be doing the opposite and allowlisting params + # or splitting the object up. Starting with the smallest change that is backwards compatible + !%i[ + backend + logger + long_polling_enabled + long_polling_interval + backend_options + base_route + client_message_filters + site_id_lookup + group_ids_lookup + user_id_lookup + transport_codec + ].include?(k) + end ::Redis.new(config) end @@ -399,9 +385,7 @@ def global_backlog_key end def process_global_backlog(highest_id, raise_error) - if highest_id > pub_redis.get(global_id_key).to_i - highest_id = 0 - end + highest_id = 0 if highest_id > pub_redis.get(global_id_key).to_i global_backlog(highest_id).each do |old| if highest_id + 1 == old.global_id @@ -444,19 +428,21 @@ def ensure_backlog_flushed if e.message =~ /^READONLY/ try_again = true else - @logger.warn("Dropping undeliverable message: #{e.message}\n#{e.backtrace.join('\n')}") + @logger.warn( + "Dropping undeliverable message: #{e.message}\n#{e.backtrace.join('\n')}", + ) end rescue => e - @logger.warn("Dropping undeliverable message: #{e.message}\n#{e.backtrace.join('\n')}") + @logger.warn( + "Dropping undeliverable message: #{e.message}\n#{e.backtrace.join('\n')}", + ) end @in_memory_backlog.delete_at(0) unless try_again end end ensure - @lock.synchronize do - @flush_backlog_thread = nil - end + @lock.synchronize { @flush_backlog_thread = nil } end def cached_eval(redis, script, script_sha1, params) @@ -479,10 +465,10 @@ def is_readonly? # in case we are not connected to the correct server # which can happen when sharing ips pub_redis.disconnect! - pub_redis.set(key, '1') + pub_redis.set(key, "1") false rescue ::Redis::CommandError => e - return true if e.message =~ /^READONLY/ + true if e.message =~ /^READONLY/ end end