-
-
Notifications
You must be signed in to change notification settings - Fork 273
/
digests.rb
144 lines (124 loc) · 4.39 KB
/
digests.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# frozen_string_literal: true
module SidekiqUniqueJobs
# Utility module to help manage unique digests in redis.
#
# @author Mikael Henriksson <mikael@zoolutions.se>
module Digests
DEFAULT_COUNT = 1_000
SCAN_PATTERN = "*"
CHUNK_SIZE = 100
include SidekiqUniqueJobs::Logging
include SidekiqUniqueJobs::Connection
extend self
# Return unique digests matching pattern
#
# @param [String] pattern a pattern to match with
# @param [Integer] count the maximum number to match
# @return [Array<String>] with unique digests
def all(pattern: SCAN_PATTERN, count: DEFAULT_COUNT)
redis { |conn| conn.sscan_each(UNIQUE_SET, match: pattern, count: count).to_a }
end
# Paginate unique digests
#
# @param [String] pattern a pattern to match with
# @param [Integer] cursor the maximum number to match
# @param [Integer] page_size the current cursor position
#
# @return [Array<String>] with unique digests
def page(pattern: SCAN_PATTERN, cursor: 0, page_size: 100)
redis do |conn|
total_size, digests = conn.multi do
conn.scard(UNIQUE_SET)
conn.sscan(UNIQUE_SET, cursor, match: pattern, count: page_size)
end
[total_size, digests[0], digests[1]]
end
end
# Get a total count of unique digests
#
# @return [Integer] number of digests
def count
redis { |conn| conn.scard(UNIQUE_SET) }
end
# Deletes unique digest either by a digest or pattern
#
# @param [String] digest the full digest to delete
# @param [String] pattern a key pattern to match with
# @param [Integer] count the maximum number
# @raise [ArgumentError] when both pattern and digest are nil
# @return [Array<String>] with unique digests
def del(digest: nil, pattern: nil, count: DEFAULT_COUNT)
warn("#{self}.#{__method__} has been deprecated and will be removed in a future version")
return delete_by_pattern(pattern, count: count) if pattern
return delete_by_digest(digest) if digest
raise ArgumentError, "either digest or pattern need to be provided"
end
# Deletes unique digest either by a digest or pattern
#
# @param [String] digest the full digest to delete
def delete_by_digest(digest) # rubocop:disable Metrics/MethodLength
result, elapsed = timed do
Scripts.call(:delete_by_digest, nil, keys: [
UNIQUE_SET,
digest,
"#{digest}:EXISTS",
"#{digest}:GRABBED",
"#{digest}:AVAILABLE",
"#{digest}:VERSION",
"#{digest}:RUN",
"#{digest}:RUN:EXISTS",
"#{digest}:RUN:GRABBED",
"#{digest}:RUN:AVAILABLE",
"#{digest}:RUN:VERSION",
])
count
end
log_info("#{__method__}(#{digest}) completed in #{elapsed}ms")
result
end
# Deletes unique digests by pattern
#
# @param [String] pattern a key pattern to match with
# @param [Integer] count the maximum number
# @return [Array<String>] with unique digests
def delete_by_pattern(pattern, count: DEFAULT_COUNT)
result, elapsed = timed do
digests = all(pattern: pattern, count: count)
batch_delete(digests)
digests.size
end
log_info("#{__method__}(#{pattern}, count: #{count}) completed in #{elapsed}ms")
result
end
private
def batch_delete(digests) # rubocop:disable Metrics/MethodLength
redis do |conn|
digests.each_slice(CHUNK_SIZE) do |chunk|
conn.pipelined do
chunk.each do |digest|
conn.del digest
conn.srem(UNIQUE_SET, digest)
conn.del("#{digest}:EXISTS")
conn.del("#{digest}:GRABBED")
conn.del("#{digest}:VERSION")
conn.del("#{digest}:AVAILABLE")
conn.del("#{digest}:RUN:EXISTS")
conn.del("#{digest}:RUN:GRABBED")
conn.del("#{digest}:RUN:VERSION")
conn.del("#{digest}:RUN:AVAILABLE")
end
end
end
end
end
def timed
start = current_time
result = yield
elapsed = (current_time - start).round(2)
[result, elapsed]
end
def current_time
Time.now
end
end
end