Skip to content

Commit

Permalink
WIP: first attempt at petergoldstein#776 / petergoldstein#941
Browse files Browse the repository at this point in the history
  • Loading branch information
Martin Sander committed Dec 10, 2022
1 parent bcfe112 commit d31d9b2
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 94 deletions.
171 changes: 90 additions & 81 deletions lib/dalli/pipelined_getter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ def process(keys, &block)
return {} if keys.empty?

@ring.lock do
servers = setup_requests(keys)
start_time = Time.now
servers = fetch_responses(servers, start_time, @ring.socket_timeout, &block) until servers.empty?
requests = setup_requests(keys)
fetch_responses(requests, @ring.socket_timeout, &block)
end
rescue NetworkError => e
Dalli.logger.debug { e.inspect }
Expand All @@ -28,57 +27,16 @@ def process(keys, &block)
end

def setup_requests(keys)
groups = groups_for_keys(keys)
make_getkq_requests(groups)

# TODO: How does this exit on a NetworkError
finish_queries(groups.keys)
end

##
# Loop through the server-grouped sets of keys, writing
# the corresponding getkq requests to the appropriate servers
#
# It's worth noting that we could potentially reduce bytes
# on the wire by switching from getkq to getq, and using
# the opaque value to match requests to responses.
##
def make_getkq_requests(groups)
groups.each do |server, keys_for_server|
server.request(:pipelined_get, keys_for_server)
rescue DalliError, NetworkError => e
Dalli.logger.debug { e.inspect }
Dalli.logger.debug { "unable to get keys for server #{server.name}" }
end
end

##
# This loops through the servers that have keys in
# our set, sending the noop to terminate the set of queries.
##
def finish_queries(servers)
deleted = []

servers.each do |server|
next unless server.alive?

begin
finish_query_for_server(server)
rescue Dalli::NetworkError
raise
rescue Dalli::DalliError
deleted.append(server)
end
end

servers.delete_if { |server| deleted.include?(server) }
rescue Dalli::NetworkError
abort_without_timeout(servers)
raise
groups_for_keys(keys).map do |server, keys|
# It's worth noting that we could potentially reduce bytes
# on the wire by switching from getkq to getq, and using
# the opaque value to match requests to responses.
[server, server.pipelined_get_request(keys)]
end.to_h
end

def finish_query_for_server(server)
server.pipeline_response_setup
server.finish_pipeline_request
rescue Dalli::NetworkError
raise
rescue Dalli::DalliError => e
Expand All @@ -92,31 +50,99 @@ def abort_without_timeout(servers)
servers.each(&:pipeline_abort)
end

def fetch_responses(servers, start_time, timeout, &block)
def fetch_responses(requests, timeout, &block)
# FIXME this was here. why. where should it go?
# Remove any servers which are not connected
servers.delete_if { |s| !s.connected? }
return [] if servers.empty?
# servers.delete_if { |s| !s.connected? }

time_left = remaining_time(start_time, timeout)
readable_servers = servers_with_response(servers, time_left)
if readable_servers.empty?
abort_with_timeout(servers)
return []
start_time = Time.now
servers = requests.keys

# FIXME. this was executed before the finish request was sent. Why?
servers.delete_if { |s| !s.alive? }

servers.each do |server|
# could be postponed to after the first write
server.pipeline_response_setup
end

# Loop through the servers with responses, and
# delete any from our list that are finished
readable_servers.each do |server|
servers.delete(server) if process_server(server, &block)
while !servers.empty?
time_left = remaining_time(start_time, timeout)
servers = read_write_select(servers, requests, time_left, &block)
end
servers
rescue NetworkError
# Abort and raise if we encountered a network error. This triggers
# a retry at the top level.
abort_without_timeout(servers)
raise
end

def read_write_select(servers, requests, time_left, &block)
# TODO: - This is a bit challenging. Essentially the PipelinedGetter
# is a reactor, but without the benefit of a Fiber or separate thread.
# My suspicion is that we may want to try and push this down into the
# individual servers, but I'm not sure. For now, we keep the
# mapping between the alerted object (the socket) and the
# corrresponding server here.
server_map = servers.each_with_object({}) { |s, h| h[s.sock] = s }

readable, writable, = IO.select(server_map.keys, server_map.keys,
nil, time_left)

if readable.nil?
abort_with_timeout(servers)
return []
end

writable.each do |socket|
server = server_map[socket]
process_writable(socket, server, requests)
end

readable.each do |socket|
server = server_map[socket]

if process_server(server, &block)
servers.delete(server)
end
end

servers
end

def process_writable(socket, server, requests)
request = requests[server]
return unless request

# FIXME cache in server class? use a different value?
buffer_size = socket.getsockopt(::Socket::SOL_SOCKET, ::Socket::SO_SNDBUF).int

chunk = request[0..buffer_size]
written = server.request(:pipelined_get, chunk)
return if written == :wait_writable

new_request = requests[server][written..]

# FIXME check the error handling here. Looks wrong
if new_request.empty?
requests.delete(server)

begin
finish_query_for_server(server)
rescue Dalli::NetworkError
raise
rescue Dalli::DalliError
servers.delete(server)
end
else
requests[server] = new_request
end
rescue DalliError, NetworkError => e
Dalli.logger.debug { e.inspect }
Dalli.logger.debug { "unable to get keys for server #{server.name}" }
# FIXME especially here. nothing raised, server not removed from servers
end

def remaining_time(start, timeout)
elapsed = Time.now - start
return 0 if elapsed > timeout
Expand Down Expand Up @@ -144,23 +170,6 @@ def process_server(server)
server.pipeline_complete?
end

def servers_with_response(servers, timeout)
return [] if servers.empty?

# TODO: - This is a bit challenging. Essentially the PipelinedGetter
# is a reactor, but without the benefit of a Fiber or separate thread.
# My suspicion is that we may want to try and push this down into the
# individual servers, but I'm not sure. For now, we keep the
# mapping between the alerted object (the socket) and the
# corrresponding server here.
server_map = servers.each_with_object({}) { |s, h| h[s.sock] = s }

readable, = IO.select(server_map.keys, nil, nil, timeout)
return [] if readable.nil?

readable.map { |sock| server_map[sock] }
end

def groups_for_keys(*keys)
keys.flatten!
keys.map! { |a| @key_manager.validate_key(a.to_s) }
Expand Down
30 changes: 17 additions & 13 deletions lib/dalli/protocol/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class Base

def_delegators :@value_marshaller, :serializer, :compressor, :compression_min_size, :compress_by_default?
def_delegators :@connection_manager, :name, :sock, :hostname, :port, :close, :connected?, :socket_timeout,
:socket_type, :up!, :down!, :write, :reconnect_down_server?, :raise_down_error
:socket_type, :up!, :down!, :write, :write_nonblock, :reconnect_down_server?, :raise_down_error

def initialize(attribs, client_options = {})
hostname, port, socket_type, @weight, user_creds = ServerConfigParser.parse(attribs)
Expand Down Expand Up @@ -59,16 +59,17 @@ def lock!; end

def unlock!; end

# Start reading key/value pairs from this connection. This is usually called
# after a series of GETKQ commands. A NOOP is sent, and the server begins
# flushing responses for kv pairs that were found.
# Get ready to read key/value pairs from this connection.
# This is usually called before or after the first GETKQ command.
#
# Returns nothing.
def pipeline_response_setup
verify_state(:getkq)
write_noop
response_buffer.reset
@connection_manager.start_request!
end

def finish_pipeline_request
write_noop
end

# Attempt to receive and parse as many key/value pairs as possible
Expand Down Expand Up @@ -143,6 +144,14 @@ def quiet?
end
alias multi? quiet?

def pipelined_get_request(keys)
req = +''
keys.each do |key|
req << quiet_get_request(key)
end
req
end

# NOTE: Additional public methods should be overridden in Dalli::Threadsafe

private
Expand Down Expand Up @@ -201,13 +210,8 @@ def connect
raise
end

def pipelined_get(keys)
req = +''
keys.each do |key|
req << quiet_get_request(key)
end
# Could send noop here instead of in pipeline_response_setup
write(req)
def pipelined_get(bytes)
write_nonblock(bytes)
end

def response_buffer
Expand Down
4 changes: 4 additions & 0 deletions lib/dalli/protocol/connection_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ def read_nonblock
@sock.read_available
end

def write_nonblock(bytes)
@sock.write_nonblock(bytes, exception: false)
end

def max_allowed_failures
@max_allowed_failures ||= @options[:socket_max_failures] || 2
end
Expand Down

0 comments on commit d31d9b2

Please sign in to comment.