Buffer writes to the cable sockets

Otherwise, they can sometimes block, leading to reduced system
throughput.
This commit is contained in:
Matthew Draper 2016-09-28 06:44:23 +09:30
parent 21ecf42730
commit 5d92089bca
4 changed files with 100 additions and 12 deletions

@ -1,3 +1,8 @@
* Buffer writes to websocket connections, to avoid blocking threads
that could be doing more useful things.
*Matthew Draper*, *Tinco Andringa*
* Protect against concurrent writes to a websocket connection from * Protect against concurrent writes to a websocket connection from
multiple threads; the underlying OS write is not always threadsafe. multiple threads; the underlying OS write is not always threadsafe.

@ -14,6 +14,9 @@ def initialize(event_loop, socket)
@rack_hijack_io = nil @rack_hijack_io = nil
@write_lock = Mutex.new @write_lock = Mutex.new
@write_head = nil
@write_buffer = Queue.new
end end
def each(&callback) def each(&callback)
@ -30,14 +33,62 @@ def shutdown
end end
def write(data) def write(data)
@write_lock.synchronize do if @stream_send
return @rack_hijack_io.write(data) if @rack_hijack_io return @stream_send.call(data)
return @stream_send.call(data) if @stream_send
end end
if @write_lock.try_lock
begin
if @write_head.nil? && @write_buffer.empty?
written = @rack_hijack_io.write_nonblock(data, exception: false)
case written
when :wait_writable
# proceed below
when data.bytesize
return data.bytesize
else
@write_head = data.byteslice(written, data.bytesize)
@event_loop.writes_pending @rack_hijack_io
return data.bytesize
end
end
ensure
@write_lock.unlock
end
end
@write_buffer << data
@event_loop.writes_pending @rack_hijack_io
data.bytesize
rescue EOFError, Errno::ECONNRESET rescue EOFError, Errno::ECONNRESET
@socket_object.client_gone @socket_object.client_gone
end end
def flush_write_buffer
@write_lock.synchronize do
loop do
if @write_head.nil?
return true if @write_buffer.empty?
@write_head = @write_buffer.pop
end
written = @rack_hijack_io.write_nonblock(@write_head, exception: false)
case written
when :wait_writable
return false
when @write_head.bytesize
@write_head = nil
else
@write_head = @write_head.byteslice(written, @write_head.bytesize)
return false
end
end
end
end
def receive(data) def receive(data)
@socket_object.parse(data) @socket_object.parse(data)
end end

@ -5,7 +5,7 @@ module ActionCable
module Connection module Connection
class StreamEventLoop class StreamEventLoop
def initialize def initialize
@nio = @thread = nil @nio = @executor = @thread = nil
@map = {} @map = {}
@stopping = false @stopping = false
@todo = Queue.new @todo = Queue.new
@ -20,13 +20,14 @@ def timer(interval, &block)
def post(task = nil, &block) def post(task = nil, &block)
task ||= block task ||= block
Concurrent.global_io_executor << task spawn
@executor << task
end end
def attach(io, stream) def attach(io, stream)
@todo << lambda do @todo << lambda do
@map[io] = stream @map[io] = @nio.register(io, :r)
@nio.register(io, :r) @map[io].value = stream
end end
wakeup wakeup
end end
@ -39,6 +40,15 @@ def detach(io, stream)
wakeup wakeup
end end
def writes_pending(io)
@todo << lambda do
if monitor = @map[io]
monitor.interests = :rw
end
end
wakeup
end
def stop def stop
@stopping = true @stopping = true
wakeup if @nio wakeup if @nio
@ -52,6 +62,13 @@ def spawn
return if @thread && @thread.status return if @thread && @thread.status
@nio ||= NIO::Selector.new @nio ||= NIO::Selector.new
@executor ||= Concurrent::ThreadPoolExecutor.new(
min_threads: 1,
max_threads: 10,
max_queue: 0,
)
@thread = Thread.new { run } @thread = Thread.new { run }
return true return true
@ -77,12 +94,25 @@ def run
monitors.each do |monitor| monitors.each do |monitor|
io = monitor.io io = monitor.io
stream = @map[io] stream = monitor.value
begin begin
stream.receive io.read_nonblock(4096) if monitor.writable?
rescue IO::WaitReadable if stream.flush_write_buffer
next monitor.interests = :r
end
next unless monitor.readable?
end
incoming = io.read_nonblock(4096, exception: false)
case incoming
when :wait_readable
next
when nil
stream.close
else
stream.receive incoming
end
rescue rescue
# We expect one of EOFError or Errno::ECONNRESET in # We expect one of EOFError or Errno::ECONNRESET in
# normal operation (when the client goes away). But if # normal operation (when the client goes away). But if

@ -28,7 +28,9 @@ def event_loop
@event_loop ||= if @config.use_faye @event_loop ||= if @config.use_faye
ActionCable::Connection::FayeEventLoop.new ActionCable::Connection::FayeEventLoop.new
else else
ActionCable::Connection::StreamEventLoop.new ActionCable::Connection::StreamEventLoop.new.tap do |loop|
loop.instance_variable_set(:@executor, Concurrent.global_io_executor)
end
end end
end end