diff --git a/lib/celluloid/io/socket.rb b/lib/celluloid/io/socket.rb index 9bd369a..3b00c47 100644 --- a/lib/celluloid/io/socket.rb +++ b/lib/celluloid/io/socket.rb @@ -70,6 +70,32 @@ def self.try_convert(socket, convert_io = true) end end + private + + def perform_io + loop do + begin + result = yield + + case result + when :wait_readable then wait_readable + when :wait_writable then wait_writable + when NilClass then return :eof + else return result + end + rescue ::IO::WaitReadable + wait_readable + retry + rescue ::IO::WaitWritable, + Errno::EAGAIN + wait_writable + retry + end + end + rescue EOFError + :eof + end + class << self extend Forwardable def_delegators '::Socket', *(::Socket.methods - self.methods - [:try_convert]) diff --git a/lib/celluloid/io/stream.rb b/lib/celluloid/io/stream.rb index b9f6185..cd84cbc 100644 --- a/lib/celluloid/io/stream.rb +++ b/lib/celluloid/io/stream.rb @@ -38,12 +38,10 @@ def sysread(length = nil, buffer = nil) buffer ||= ''.force_encoding(Encoding::ASCII_8BIT) @read_latch.synchronize do - begin - read_nonblock(length, buffer) - rescue ::IO::WaitReadable - wait_readable - retry + op = perform_io do + read_nonblock(length, buffer) end + raise EOFError if op == :eof end buffer @@ -58,17 +56,10 @@ def syswrite(string) @write_latch.synchronize do while total_written < length - begin - written = write_nonblock(remaining) - rescue ::IO::WaitWritable - wait_writable - retry - rescue EOFError - return total_written - rescue Errno::EAGAIN - wait_writable - retry - end + written = perform_io do + write_nonblock(remaining) + end + return total_written if written == :eof total_written += written @@ -80,6 +71,20 @@ def syswrite(string) total_written end + # TODO: remove after ending ruby 2.0.0 support + if RUBY_VERSION >= "2.1" + def read_nonblock(*args, **options) + options[:exception] = false unless options.has_key?(:exception) + super(*args, **options) + end + + def write_nonblock(*args, **options) + options[:exception] = false unless options.has_key?(:exception) + super(*args, **options) + end + end + + # Reads +size+ bytes from the stream. If +buf+ is provided it must # reference a string which will receive the data. # diff --git a/lib/celluloid/io/udp_socket.rb b/lib/celluloid/io/udp_socket.rb index b2edc36..05e6783 100644 --- a/lib/celluloid/io/udp_socket.rb +++ b/lib/celluloid/io/udp_socket.rb @@ -31,18 +31,23 @@ def wait_readable; Celluloid::IO.wait_readable(self); end # MSG_ options. The first element of the results, mesg, is the data # received. The second element, sender_addrinfo, contains # protocol-specific address information of the sender. - def recvfrom(maxlen, flags = 0) - begin + if RUBY_VERSION >= "2.3" + def recvfrom(*args, **options) socket = to_io - if socket.respond_to? :recvfrom_nonblock - socket.recvfrom_nonblock(maxlen, flags) - else - # FIXME: hax for JRuby - socket.recvfrom(maxlen, flags) + options[:exception] = false unless options.has_key?(:exception) + perform_io { socket.recvfrom_nonblock(*args, **options) } + end + else + def recvfrom(*args) + socket = to_io + perform_io do + if socket.respond_to? :recvfrom_nonblock + socket.recvfrom_nonblock(*args) + else + # FIXME: hax for JRuby + socket.recvfrom(*args) + end end - rescue ::IO::WaitReadable - wait_readable - retry end end diff --git a/spec/celluloid/io/reactor_spec.rb b/spec/celluloid/io/reactor_spec.rb index fcbf386..62886a6 100644 --- a/spec/celluloid/io/reactor_spec.rb +++ b/spec/celluloid/io/reactor_spec.rb @@ -17,7 +17,7 @@ # Main server body: within_io_actor do begin - timeout(2) do + Timeout.timeout(2) do loop do socket.readpartial(2046) end diff --git a/spec/celluloid/io/tcp_socket_spec.rb b/spec/celluloid/io/tcp_socket_spec.rb index aaba86f..6cc59e8 100644 --- a/spec/celluloid/io/tcp_socket_spec.rb +++ b/spec/celluloid/io/tcp_socket_spec.rb @@ -188,7 +188,9 @@ with_connected_sockets(example_port) do |subject, peer| subject.sync = false within_io_actor { subject << payload } - expect{ peer.read_nonblock payload.length }.to raise_exception ::IO::WaitReadable + if RUBY_VERSION < "2.1" + expect{ peer.read_nonblock payload.length }.to raise_exception ::IO::WaitReadable + end within_io_actor { subject.close } expect(peer.read).to eq payload end