Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: first attempt at #776 / #941 #942

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 93 additions & 80 deletions lib/dalli/pipelined_getter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,68 +17,26 @@ def process(keys, &block)
return {} if keys.empty?

@ring.lock do
servers = setup_requests(keys)
start_time = Time.now
servers = fetch_responses(servers, start_time, @ring.socket_timeout, &block) until servers.empty?
requests = setup_requests(keys)
fetch_responses(requests, @ring.socket_timeout, &block)
end
rescue NetworkError => e
Dalli.logger.debug { e.inspect }
Dalli.logger.debug { 'retrying pipelined gets because of timeout' }
retry
end

def setup_requests(keys)
groups = groups_for_keys(keys)
make_getkq_requests(groups)

# TODO: How does this exit on a NetworkError
finish_queries(groups.keys)
end

##
# Loop through the server-grouped sets of keys, writing
# the corresponding getkq requests to the appropriate servers
#
# It's worth noting that we could potentially reduce bytes
# on the wire by switching from getkq to getq, and using
# the opaque value to match requests to responses.
##
def make_getkq_requests(groups)
groups.each do |server, keys_for_server|
server.request(:pipelined_get, keys_for_server)
rescue DalliError, NetworkError => e
Dalli.logger.debug { e.inspect }
Dalli.logger.debug { "unable to get keys for server #{server.name}" }
def setup_requests(all_keys)
groups_for_keys(all_keys).to_h do |server, keys|
# It's worth noting that we could potentially reduce bytes
# on the wire by switching from getkq to getq, and using
# the opaque value to match requests to responses.
[server, server.pipelined_get_request(keys)]
end
end

##
# This loops through the servers that have keys in
# our set, sending the noop to terminate the set of queries.
##
def finish_queries(servers)
deleted = []

servers.each do |server|
next unless server.alive?

begin
finish_query_for_server(server)
rescue Dalli::NetworkError
raise
rescue Dalli::DalliError
deleted.append(server)
end
end

servers.delete_if { |server| deleted.include?(server) }
rescue Dalli::NetworkError
abort_without_timeout(servers)
raise
end

def finish_query_for_server(server)
server.pipeline_response_setup
server.finish_pipeline_request
rescue Dalli::NetworkError
raise
rescue Dalli::DalliError => e
Expand All @@ -92,29 +50,101 @@ def abort_without_timeout(servers)
servers.each(&:pipeline_abort)
end

def fetch_responses(servers, start_time, timeout, &block)
def fetch_responses(requests, timeout, &block)
# FIXME: this was here. why. where should it go?
# Remove any servers which are not connected
servers.delete_if { |s| !s.connected? }
return [] if servers.empty?
# servers.delete_if { |s| !s.connected? }

start_time = Time.now
servers = requests.keys

# FIXME: this was executed before the finish request was sent. Why?
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to look at this in a little detail. The behavior in the case of no servers available is supposed to a returned [] with no error. But if I recall correctly, there are cases where the a server is not connected that can trigger raising a Dalli::Error without this check. I'll find some time to look.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any hints on the intended original behavior of the multi-server code are highly appreciated. I haven't been able to fully wrap my head around it yet, and I guess it is the hardest to get right when things fail in between reading/writing chunks. Hard to correctly test as well..

servers.delete_if { |s| !s.alive? }

# could be postponed to after the first write
servers.each(&:pipeline_response_setup)

time_left = remaining_time(start_time, timeout)
readable_servers = servers_with_response(servers, time_left)
if readable_servers.empty?
until servers.empty?
time_left = remaining_time(start_time, timeout)
servers = read_write_select(servers, requests, time_left, &block)
end
rescue NetworkError
# Abort and raise if we encountered a network error. This triggers
# a retry at the top level.
abort_without_timeout(servers)
raise
end

def read_write_select(servers, requests, time_left, &block)
# TODO: - This is a bit challenging. Essentially the PipelinedGetter
# is a reactor, but without the benefit of a Fiber or separate thread.
# My suspicion is that we may want to try and push this down into the
# individual servers, but I'm not sure. For now, we keep the
# mapping between the alerted object (the socket) and the
# corrresponding server here.
server_map = servers.each_with_object({}) { |s, h| h[s.sock] = s }

readable, writable, = IO.select(server_map.keys, server_map.keys,
nil, time_left)

if readable.nil?
abort_with_timeout(servers)
return []
end

# Loop through the servers with responses, and
# delete any from our list that are finished
readable_servers.each do |server|
writable.each do |socket|
server = server_map[socket]
process_writable(server, servers, requests)
end

readable.each do |socket|
server = server_map[socket]

servers.delete(server) if process_server(server, &block)
end

servers
rescue NetworkError
# Abort and raise if we encountered a network error. This triggers
# a retry at the top level.
end

def process_writable(server, servers, requests)
request = requests[server]
return unless request

new_request = server_pipelined_get(server, request)

if new_request.empty?
requests.delete(server)

begin
finish_query_for_server(server)
rescue Dalli::NetworkError
raise
rescue Dalli::DalliError
servers.delete(server)
end
else
requests[server] = new_request
end
rescue Dalli::NetworkError
abort_without_timeout(servers)
raise
rescue DalliError => e
Dalli.logger.debug { e.inspect }
Dalli.logger.debug { "unable to get keys for server #{server.name}" }
end

def server_pipelined_get(server, request)
buffer_size = server.socket_sndbuf
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had been thinking about doing it by batching the number of keys, but I think this approach of looking it at from the perspective of # of bytes is a better way to address the issue. It also allows us to give the end user direction about how to control the behavior (by tuning sndbuf)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started out with batching the keys, but I got the idea to batch by bytes instead when I struggled to come up with a good default value for the batch size - It seemed to me that the original problem was caused by the response size rather than the number of keys requested..

I don't know if sndbuf is a good value though, maybe it is more efficient to try to write larger batches than the buffer.
It will also not allow users of the library to tweak the send buffer and the chunk size independently (I do not know if that makes sense).

It might not matter at all as this uses nonblocking IO for the writes, which could mean that there would just be partial writes (which are already correctly handled AFAIK) if we try to write everything at once. Maybe I will do some experiments, but I think there are other areas I should polish first (basic correctness being the most important one).

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might do something like this:

  1. Add an explicit chunking parameter that can be set on the client
  2. Have that chunking parameter fallback to the sndbuf size if it isn't explicitly set

chunk = request[0..buffer_size]
written = server.request(:pipelined_get, chunk)
return if written == :wait_writable

request[written..]
rescue Dalli::NetworkError
raise
rescue DalliError => e
Dalli.logger.debug { e.inspect }
Dalli.logger.debug { "unable to get keys for server #{server.name}" }
end

def remaining_time(start, timeout)
Expand Down Expand Up @@ -144,23 +174,6 @@ def process_server(server)
server.pipeline_complete?
end

def servers_with_response(servers, timeout)
return [] if servers.empty?

# TODO: - This is a bit challenging. Essentially the PipelinedGetter
# is a reactor, but without the benefit of a Fiber or separate thread.
# My suspicion is that we may want to try and push this down into the
# individual servers, but I'm not sure. For now, we keep the
# mapping between the alerted object (the socket) and the
# corrresponding server here.
server_map = servers.each_with_object({}) { |s, h| h[s.sock] = s }

readable, = IO.select(server_map.keys, nil, nil, timeout)
return [] if readable.nil?

readable.map { |sock| server_map[sock] }
end

def groups_for_keys(*keys)
keys.flatten!
keys.map! { |a| @key_manager.validate_key(a.to_s) }
Expand Down
37 changes: 24 additions & 13 deletions lib/dalli/protocol/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ class Base

def_delegators :@value_marshaller, :serializer, :compressor, :compression_min_size, :compress_by_default?
def_delegators :@connection_manager, :name, :sock, :hostname, :port, :close, :connected?, :socket_timeout,
:socket_type, :up!, :down!, :write, :reconnect_down_server?, :raise_down_error
:socket_type, :socket_sndbuf, :up!, :down!, :write, :write_nonblock, :reconnect_down_server?,
:raise_down_error

def initialize(attribs, client_options = {})
hostname, port, socket_type, @weight, user_creds = ServerConfigParser.parse(attribs)
Expand Down Expand Up @@ -59,16 +60,17 @@ def lock!; end

def unlock!; end

# Start reading key/value pairs from this connection. This is usually called
# after a series of GETKQ commands. A NOOP is sent, and the server begins
# flushing responses for kv pairs that were found.
# Get ready to read key/value pairs from this connection.
# This is usually called before or after the first GETKQ command.
#
# Returns nothing.
def pipeline_response_setup
verify_state(:getkq)
write_noop
response_buffer.reset
@connection_manager.start_request!
end

def finish_pipeline_request
write_noop
end

# Attempt to receive and parse as many key/value pairs as possible
Expand Down Expand Up @@ -143,6 +145,14 @@ def quiet?
end
alias multi? quiet?

def pipelined_get_request(keys)
req = +String.new(capacity: pipelined_get_capacity(keys))
keys.each do |key|
req << quiet_get_request(key)
end
req
end

# NOTE: Additional public methods should be overridden in Dalli::Threadsafe

private
Expand Down Expand Up @@ -201,13 +211,14 @@ def connect
raise
end

def pipelined_get(keys)
req = +''
keys.each do |key|
req << quiet_get_request(key)
end
# Could send noop here instead of in pipeline_response_setup
write(req)
def pipelined_get(bytes)
write_nonblock(bytes)
rescue SystemCallError, Timeout::Error, EOFError => e
@connection_manager.error_on_request!(e)
end

def pipelined_get_capacity(keys)
(keys.size * request_header_size) + keys.reduce(0) { |acc, k| acc + k.size }
end

def response_buffer
Expand Down
4 changes: 4 additions & 0 deletions lib/dalli/protocol/binary.rb
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ def write_noop
write(req)
end

def request_header_size
24
end

require_relative 'binary/request_formatter'
require_relative 'binary/response_header'
require_relative 'binary/response_processor'
Expand Down
10 changes: 10 additions & 0 deletions lib/dalli/protocol/connection_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ def read_nonblock
@sock.read_available
end

def write_nonblock(bytes)
@sock.write_nonblock(bytes, exception: false)
end

def max_allowed_failures
@max_allowed_failures ||= @options[:socket_max_failures] || 2
end
Expand Down Expand Up @@ -247,6 +251,12 @@ def log_up_detected
time = Time.now - @down_at
Dalli.logger.warn { format('%<name>s is back (downtime was %<time>.3f seconds)', name: name, time: time) }
end

def socket_sndbuf
@socket_sndbuf ||=
@sock&.getsockopt(::Socket::SOL_SOCKET, ::Socket::SO_SNDBUF)&.int ||
32_768
end
end
end
end
4 changes: 4 additions & 0 deletions lib/dalli/protocol/meta.rb
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ def authenticate_connection
raise Dalli::DalliError, 'Authentication not supported for the meta protocol.'
end

def request_header_size
17
end

require_relative 'meta/key_regularizer'
require_relative 'meta/request_formatter'
require_relative 'meta/response_processor'
Expand Down
21 changes: 21 additions & 0 deletions test/integration/test_pipelined_get.rb
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,27 @@
end
end

it 'does not block for a large number of existing keys' do
memcached_persistent(p) do |dc|
dc.close
dc.flush

key_count = 200_000
range = 0...key_count
dc.quiet do
range.each { |i| dc.set(i, "foobar_#{i}") }
end

Timeout.timeout 60 do
resp = dc.get_multi(range.to_a)

assert_equal key_count, resp.count
end
rescue Timeout::Error
flunk "timed out while getting #{key_count} keys with get_multi"
end
end

describe 'pipeline_next_responses' do
it 'raises NetworkError when called before pipeline_response_setup' do
memcached_persistent(p) do |dc|
Expand Down