Merge pull request #26646 from matthewd/cable-buffer
Buffer writes to the cable sockets
This commit is contained in:
commit
8356dcac9b
@ -1,3 +1,8 @@
|
||||
* Buffer writes to websocket connections, to avoid blocking threads
|
||||
that could be doing more useful things.
|
||||
|
||||
*Matthew Draper*, *Tinco Andringa*
|
||||
|
||||
* Protect against concurrent writes to a websocket connection from
|
||||
multiple threads; the underlying OS write is not always threadsafe.
|
||||
|
||||
|
@ -14,6 +14,9 @@ def initialize(event_loop, socket)
|
||||
|
||||
@rack_hijack_io = nil
|
||||
@write_lock = Mutex.new
|
||||
|
||||
@write_head = nil
|
||||
@write_buffer = Queue.new
|
||||
end
|
||||
|
||||
def each(&callback)
|
||||
@ -30,14 +33,62 @@ def shutdown
|
||||
end
|
||||
|
||||
def write(data)
|
||||
@write_lock.synchronize do
|
||||
return @rack_hijack_io.write(data) if @rack_hijack_io
|
||||
return @stream_send.call(data) if @stream_send
|
||||
if @stream_send
|
||||
return @stream_send.call(data)
|
||||
end
|
||||
|
||||
if @write_lock.try_lock
|
||||
begin
|
||||
if @write_head.nil? && @write_buffer.empty?
|
||||
written = @rack_hijack_io.write_nonblock(data, exception: false)
|
||||
|
||||
case written
|
||||
when :wait_writable
|
||||
# proceed below
|
||||
when data.bytesize
|
||||
return data.bytesize
|
||||
else
|
||||
@write_head = data.byteslice(written, data.bytesize)
|
||||
@event_loop.writes_pending @rack_hijack_io
|
||||
|
||||
return data.bytesize
|
||||
end
|
||||
end
|
||||
ensure
|
||||
@write_lock.unlock
|
||||
end
|
||||
end
|
||||
|
||||
@write_buffer << data
|
||||
@event_loop.writes_pending @rack_hijack_io
|
||||
|
||||
data.bytesize
|
||||
rescue EOFError, Errno::ECONNRESET
|
||||
@socket_object.client_gone
|
||||
end
|
||||
|
||||
def flush_write_buffer
|
||||
@write_lock.synchronize do
|
||||
loop do
|
||||
if @write_head.nil?
|
||||
return true if @write_buffer.empty?
|
||||
@write_head = @write_buffer.pop
|
||||
end
|
||||
|
||||
written = @rack_hijack_io.write_nonblock(@write_head, exception: false)
|
||||
case written
|
||||
when :wait_writable
|
||||
return false
|
||||
when @write_head.bytesize
|
||||
@write_head = nil
|
||||
else
|
||||
@write_head = @write_head.byteslice(written, @write_head.bytesize)
|
||||
return false
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def receive(data)
|
||||
@socket_object.parse(data)
|
||||
end
|
||||
|
@ -5,7 +5,7 @@ module ActionCable
|
||||
module Connection
|
||||
class StreamEventLoop
|
||||
def initialize
|
||||
@nio = @thread = nil
|
||||
@nio = @executor = @thread = nil
|
||||
@map = {}
|
||||
@stopping = false
|
||||
@todo = Queue.new
|
||||
@ -20,13 +20,14 @@ def timer(interval, &block)
|
||||
def post(task = nil, &block)
|
||||
task ||= block
|
||||
|
||||
Concurrent.global_io_executor << task
|
||||
spawn
|
||||
@executor << task
|
||||
end
|
||||
|
||||
def attach(io, stream)
|
||||
@todo << lambda do
|
||||
@map[io] = stream
|
||||
@nio.register(io, :r)
|
||||
@map[io] = @nio.register(io, :r)
|
||||
@map[io].value = stream
|
||||
end
|
||||
wakeup
|
||||
end
|
||||
@ -39,6 +40,15 @@ def detach(io, stream)
|
||||
wakeup
|
||||
end
|
||||
|
||||
def writes_pending(io)
|
||||
@todo << lambda do
|
||||
if monitor = @map[io]
|
||||
monitor.interests = :rw
|
||||
end
|
||||
end
|
||||
wakeup
|
||||
end
|
||||
|
||||
def stop
|
||||
@stopping = true
|
||||
wakeup if @nio
|
||||
@ -52,6 +62,13 @@ def spawn
|
||||
return if @thread && @thread.status
|
||||
|
||||
@nio ||= NIO::Selector.new
|
||||
|
||||
@executor ||= Concurrent::ThreadPoolExecutor.new(
|
||||
min_threads: 1,
|
||||
max_threads: 10,
|
||||
max_queue: 0,
|
||||
)
|
||||
|
||||
@thread = Thread.new { run }
|
||||
|
||||
return true
|
||||
@ -77,12 +94,25 @@ def run
|
||||
|
||||
monitors.each do |monitor|
|
||||
io = monitor.io
|
||||
stream = @map[io]
|
||||
stream = monitor.value
|
||||
|
||||
begin
|
||||
stream.receive io.read_nonblock(4096)
|
||||
rescue IO::WaitReadable
|
||||
next
|
||||
if monitor.writable?
|
||||
if stream.flush_write_buffer
|
||||
monitor.interests = :r
|
||||
end
|
||||
next unless monitor.readable?
|
||||
end
|
||||
|
||||
incoming = io.read_nonblock(4096, exception: false)
|
||||
case incoming
|
||||
when :wait_readable
|
||||
next
|
||||
when nil
|
||||
stream.close
|
||||
else
|
||||
stream.receive incoming
|
||||
end
|
||||
rescue
|
||||
# We expect one of EOFError or Errno::ECONNRESET in
|
||||
# normal operation (when the client goes away). But if
|
||||
|
@ -28,7 +28,9 @@ def event_loop
|
||||
@event_loop ||= if @config.use_faye
|
||||
ActionCable::Connection::FayeEventLoop.new
|
||||
else
|
||||
ActionCable::Connection::StreamEventLoop.new
|
||||
ActionCable::Connection::StreamEventLoop.new.tap do |loop|
|
||||
loop.instance_variable_set(:@executor, Concurrent.global_io_executor)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user