From bea7638c1462776679d124e7f22f1fb3d1772190 Mon Sep 17 00:00:00 2001 From: Dome-GER Date: Sun, 30 Jan 2022 11:21:24 +0100 Subject: [PATCH] Reconnect on known errors after failover when pushing jobs to Redis In a Redis cluster setup, failovers will happen. In these cases a `Redis::CommandError` can be raised for different reasons, for example when the server becomes a replica, when there is a "Not enough replicas" error from the primary, or when a blocking command is force-unblocked. These errors can occur when pushing a job to Redis, so it needs to reconnect to the current master node and retry. Otherwise, these jobs are lost. The retry logic is similar to the implementation for `Sidekiq.redis`. --- lib/sidekiq.rb | 1 + lib/sidekiq/client.rb | 19 +++++++++++++++++-- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/lib/sidekiq.rb b/lib/sidekiq.rb index 0761c2984..76113a66d 100644 --- a/lib/sidekiq.rb +++ b/lib/sidekiq.rb @@ -103,6 +103,7 @@ def self.redis # to disconnect and reopen the socket to get back to the primary. # 4495 Use the same logic if we have a "Not enough replicas" error from the primary # 4985 Use the same logic when a blocking command is force-unblocked + # The same retry logic is also used in client.rb if retryable && ex.message =~ /READONLY|NOREPLICAS|UNBLOCKED/ conn.disconnect! retryable = false diff --git a/lib/sidekiq/client.rb b/lib/sidekiq/client.rb index 9a79ba22f..74feb56b1 100644 --- a/lib/sidekiq/client.rb +++ b/lib/sidekiq/client.rb @@ -189,8 +189,23 @@ def enqueue_in(interval, klass, *args) def raw_push(payloads) @redis_pool.with do |conn| - conn.pipelined do |pipeline| - atomic_push(pipeline, payloads) + retryable = true + begin + conn.pipelined do |pipeline| + atomic_push(pipeline, payloads) + end + rescue Redis::BaseError => ex + # 2550 Failover can cause the server to become a replica, need + # to disconnect and reopen the socket to get back to the primary. + # 4495 Use the same logic if we have a "Not enough replicas" error from the primary + # 4985 Use the same logic when a blocking command is force-unblocked + # The retry logic is copied from sidekiq.rb + if retryable && ex.message =~ /READONLY|NOREPLICAS|UNBLOCKED/ + conn.disconnect! + retryable = false + retry + end + raise end end true