Import the relevant portions of faye-websocket
(as adapted to use concurrent-ruby / nio4r instead of eventmachine)
This commit is contained in:
parent
68a9060d02
commit
322dca293b
2
Gemfile
2
Gemfile
@ -66,8 +66,6 @@ group :cable do
|
||||
|
||||
gem 'em-hiredis', require: false
|
||||
gem 'redis', require: false
|
||||
|
||||
gem 'faye-websocket', github: "matthewd/faye-websocket-ruby", branch: "no-em-concept", require: false
|
||||
end
|
||||
|
||||
# Add your own local bundler stuff.
|
||||
|
13
Gemfile.lock
13
Gemfile.lock
@ -19,16 +19,6 @@ GIT
|
||||
qu (= 0.2.0)
|
||||
redis-namespace
|
||||
|
||||
GIT
|
||||
remote: git://github.com/matthewd/faye-websocket-ruby.git
|
||||
revision: f608bb57844b91b397817b06ab6bf744324010ab
|
||||
branch: no-em-concept
|
||||
specs:
|
||||
faye-websocket (0.10.2)
|
||||
concurrent-ruby (~> 1.0)
|
||||
nio4r (~> 1.2)
|
||||
websocket-driver (>= 0.5.1)
|
||||
|
||||
GIT
|
||||
remote: git://github.com/sass/sass.git
|
||||
revision: bce9509f396225d721501ea1070a6871b708abb1
|
||||
@ -42,7 +32,7 @@ PATH
|
||||
actioncable (5.0.0.beta1)
|
||||
actionpack (= 5.0.0.beta1)
|
||||
coffee-rails (~> 4.1.0)
|
||||
faye-websocket (~> 0.10.0)
|
||||
nio4r (~> 1.2)
|
||||
websocket-driver (~> 0.6.1)
|
||||
actionmailer (5.0.0.beta1)
|
||||
actionpack (= 5.0.0.beta1)
|
||||
@ -314,7 +304,6 @@ DEPENDENCIES
|
||||
delayed_job
|
||||
delayed_job_active_record
|
||||
em-hiredis
|
||||
faye-websocket!
|
||||
jquery-rails
|
||||
json
|
||||
kindlerb (= 0.1.1)
|
||||
|
@ -21,7 +21,7 @@
|
||||
s.add_dependency 'actionpack', version
|
||||
|
||||
s.add_dependency 'coffee-rails', '~> 4.1.0'
|
||||
s.add_dependency 'faye-websocket', '~> 0.10.0'
|
||||
s.add_dependency 'nio4r', '~> 1.2'
|
||||
s.add_dependency 'websocket-driver', '~> 0.6.1'
|
||||
|
||||
s.add_development_dependency 'em-hiredis', '~> 0.3.0'
|
||||
|
@ -5,12 +5,15 @@ module Connection
|
||||
eager_autoload do
|
||||
autoload :Authorization
|
||||
autoload :Base
|
||||
autoload :ClientSocket
|
||||
autoload :Identification
|
||||
autoload :InternalChannel
|
||||
autoload :MessageBuffer
|
||||
autoload :WebSocket
|
||||
autoload :Stream
|
||||
autoload :StreamEventLoop
|
||||
autoload :Subscriptions
|
||||
autoload :TaggedLoggerProxy
|
||||
autoload :WebSocket
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -49,14 +49,14 @@ class Base
|
||||
include Authorization
|
||||
|
||||
attr_reader :server, :env, :subscriptions, :logger
|
||||
delegate :worker_pool, :pubsub, to: :server
|
||||
delegate :stream_event_loop, :worker_pool, :pubsub, to: :server
|
||||
|
||||
def initialize(server, env)
|
||||
@server, @env = server, env
|
||||
|
||||
@logger = new_tagged_logger
|
||||
|
||||
@websocket = ActionCable::Connection::WebSocket.new(env)
|
||||
@websocket = ActionCable::Connection::WebSocket.new(env, self, stream_event_loop)
|
||||
@subscriptions = ActionCable::Connection::Subscriptions.new(self)
|
||||
@message_buffer = ActionCable::Connection::MessageBuffer.new(self)
|
||||
|
||||
@ -70,10 +70,6 @@ def process
|
||||
logger.info started_request_message
|
||||
|
||||
if websocket.possible? && allow_request_origin?
|
||||
websocket.on(:open) { |event| send_async :on_open }
|
||||
websocket.on(:message) { |event| on_message event.data }
|
||||
websocket.on(:close) { |event| send_async :on_close }
|
||||
|
||||
respond_to_successful_request
|
||||
else
|
||||
respond_to_invalid_request
|
||||
@ -121,6 +117,22 @@ def beat
|
||||
transmit ActiveSupport::JSON.encode(identifier: ActionCable::INTERNAL[:identifiers][:ping], message: Time.now.to_i)
|
||||
end
|
||||
|
||||
def on_open # :nodoc:
|
||||
send_async :handle_open
|
||||
end
|
||||
|
||||
def on_message(message) # :nodoc:
|
||||
message_buffer.append message
|
||||
end
|
||||
|
||||
def on_error(message) # :nodoc:
|
||||
# ignore
|
||||
end
|
||||
|
||||
def on_close # :nodoc:
|
||||
send_async :handle_close
|
||||
end
|
||||
|
||||
protected
|
||||
# The request that initiated the WebSocket connection is available here. This gives access to the environment, cookies, etc.
|
||||
def request
|
||||
@ -139,7 +151,7 @@ def cookies
|
||||
attr_reader :message_buffer
|
||||
|
||||
private
|
||||
def on_open
|
||||
def handle_open
|
||||
connect if respond_to?(:connect)
|
||||
subscribe_to_internal_channel
|
||||
beat
|
||||
@ -150,11 +162,7 @@ def on_open
|
||||
respond_to_invalid_request
|
||||
end
|
||||
|
||||
def on_message(message)
|
||||
message_buffer.append message
|
||||
end
|
||||
|
||||
def on_close
|
||||
def handle_close
|
||||
logger.info finished_request_message
|
||||
|
||||
server.remove_connection(self)
|
||||
|
152
actioncable/lib/action_cable/connection/client_socket.rb
Normal file
152
actioncable/lib/action_cable/connection/client_socket.rb
Normal file
@ -0,0 +1,152 @@
|
||||
require 'websocket/driver'
|
||||
|
||||
module ActionCable
|
||||
module Connection
|
||||
#--
|
||||
# This class is heavily based on faye-websocket-ruby
|
||||
#
|
||||
# Copyright (c) 2010-2015 James Coglan
|
||||
class ClientSocket # :nodoc:
|
||||
def self.determine_url(env)
|
||||
scheme = secure_request?(env) ? 'wss:' : 'ws:'
|
||||
"#{ scheme }//#{ env['HTTP_HOST'] }#{ env['REQUEST_URI'] }"
|
||||
end
|
||||
|
||||
def self.secure_request?(env)
|
||||
return true if env['HTTPS'] == 'on'
|
||||
return true if env['HTTP_X_FORWARDED_SSL'] == 'on'
|
||||
return true if env['HTTP_X_FORWARDED_SCHEME'] == 'https'
|
||||
return true if env['HTTP_X_FORWARDED_PROTO'] == 'https'
|
||||
return true if env['rack.url_scheme'] == 'https'
|
||||
|
||||
return false
|
||||
end
|
||||
|
||||
CONNECTING = 0
|
||||
OPEN = 1
|
||||
CLOSING = 2
|
||||
CLOSED = 3
|
||||
|
||||
attr_reader :env, :url
|
||||
|
||||
def initialize(env, event_target, stream_event_loop)
|
||||
@env = env
|
||||
@event_target = event_target
|
||||
@stream_event_loop = stream_event_loop
|
||||
|
||||
@url = ClientSocket.determine_url(@env)
|
||||
|
||||
@driver = @driver_started = nil
|
||||
|
||||
@ready_state = CONNECTING
|
||||
|
||||
# The driver calls +env+, +url+, and +write+
|
||||
@driver = ::WebSocket::Driver.rack(self)
|
||||
|
||||
@driver.on(:open) { |e| open }
|
||||
@driver.on(:message) { |e| receive_message(e.data) }
|
||||
@driver.on(:close) { |e| begin_close(e.reason, e.code) }
|
||||
@driver.on(:error) { |e| emit_error(e.message) }
|
||||
|
||||
@stream = ActionCable::Connection::Stream.new(@stream_event_loop, self)
|
||||
|
||||
if callback = @env['async.callback']
|
||||
callback.call([101, {}, @stream])
|
||||
end
|
||||
end
|
||||
|
||||
def start_driver
|
||||
return if @driver.nil? || @driver_started
|
||||
@driver_started = true
|
||||
@driver.start
|
||||
end
|
||||
|
||||
def rack_response
|
||||
start_driver
|
||||
[ -1, {}, [] ]
|
||||
end
|
||||
|
||||
def write(data)
|
||||
@stream.write(data)
|
||||
end
|
||||
|
||||
def transmit(message)
|
||||
return false if @ready_state > OPEN
|
||||
case message
|
||||
when Numeric then @driver.text(message.to_s)
|
||||
when String then @driver.text(message)
|
||||
when Array then @driver.binary(message)
|
||||
else false
|
||||
end
|
||||
end
|
||||
|
||||
def close(code = nil, reason = nil)
|
||||
code ||= 1000
|
||||
reason ||= ''
|
||||
|
||||
unless code == 1000 or (code >= 3000 and code <= 4999)
|
||||
raise ArgumentError, "Failed to execute 'close' on WebSocket: " +
|
||||
"The code must be either 1000, or between 3000 and 4999. " +
|
||||
"#{code} is neither."
|
||||
end
|
||||
|
||||
@ready_state = CLOSING unless @ready_state == CLOSED
|
||||
@driver.close(reason, code)
|
||||
end
|
||||
|
||||
def parse(data)
|
||||
@driver.parse(data)
|
||||
end
|
||||
|
||||
def client_gone
|
||||
finalize_close
|
||||
end
|
||||
|
||||
def alive?
|
||||
@ready_state == OPEN
|
||||
end
|
||||
|
||||
private
|
||||
def open
|
||||
return unless @ready_state == CONNECTING
|
||||
@ready_state = OPEN
|
||||
|
||||
@event_target.on_open
|
||||
end
|
||||
|
||||
def receive_message(data)
|
||||
return unless @ready_state == OPEN
|
||||
|
||||
@event_target.on_message(data)
|
||||
end
|
||||
|
||||
def emit_error(message)
|
||||
return if @ready_state >= CLOSING
|
||||
|
||||
@event_target.on_error(message)
|
||||
end
|
||||
|
||||
def begin_close(reason, code)
|
||||
return if @ready_state == CLOSED
|
||||
@ready_state = CLOSING
|
||||
@close_params = [reason, code]
|
||||
|
||||
if @stream
|
||||
@stream.shutdown
|
||||
else
|
||||
finalize_close
|
||||
end
|
||||
end
|
||||
|
||||
def finalize_close
|
||||
return if @ready_state == CLOSED
|
||||
@ready_state = CLOSED
|
||||
|
||||
reason = @close_params ? @close_params[0] : ''
|
||||
code = @close_params ? @close_params[1] : 1006
|
||||
|
||||
@event_target.on_close(code, reason)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
59
actioncable/lib/action_cable/connection/stream.rb
Normal file
59
actioncable/lib/action_cable/connection/stream.rb
Normal file
@ -0,0 +1,59 @@
|
||||
module ActionCable
|
||||
module Connection
|
||||
#--
|
||||
# This class is heavily based on faye-websocket-ruby
|
||||
#
|
||||
# Copyright (c) 2010-2015 James Coglan
|
||||
class Stream
|
||||
def initialize(event_loop, socket)
|
||||
@event_loop = event_loop
|
||||
@socket_object = socket
|
||||
@stream_send = socket.env['stream.send']
|
||||
|
||||
@rack_hijack_io = nil
|
||||
|
||||
hijack_rack_socket
|
||||
end
|
||||
|
||||
def each(&callback)
|
||||
@stream_send ||= callback
|
||||
end
|
||||
|
||||
def close
|
||||
shutdown
|
||||
@socket_object.client_gone
|
||||
end
|
||||
|
||||
def shutdown
|
||||
clean_rack_hijack
|
||||
end
|
||||
|
||||
def write(data)
|
||||
return @rack_hijack_io.write(data) if @rack_hijack_io
|
||||
return @stream_send.call(data) if @stream_send
|
||||
rescue EOFError
|
||||
@socket_object.client_gone
|
||||
end
|
||||
|
||||
def receive(data)
|
||||
@socket_object.parse(data)
|
||||
end
|
||||
|
||||
private
|
||||
def hijack_rack_socket
|
||||
return unless @socket_object.env['rack.hijack']
|
||||
|
||||
@socket_object.env['rack.hijack'].call
|
||||
@rack_hijack_io = @socket_object.env['rack.hijack_io']
|
||||
|
||||
@event_loop.attach(@rack_hijack_io, self)
|
||||
end
|
||||
|
||||
def clean_rack_hijack
|
||||
return unless @rack_hijack_io
|
||||
@event_loop.detach(@rack_hijack_io, self)
|
||||
@rack_hijack_io = nil
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
68
actioncable/lib/action_cable/connection/stream_event_loop.rb
Normal file
68
actioncable/lib/action_cable/connection/stream_event_loop.rb
Normal file
@ -0,0 +1,68 @@
|
||||
require 'nio'
|
||||
|
||||
module ActionCable
|
||||
module Connection
|
||||
class StreamEventLoop
|
||||
def initialize
|
||||
@nio = NIO::Selector.new
|
||||
@map = {}
|
||||
@stopping = false
|
||||
@todo = Queue.new
|
||||
|
||||
Thread.new do
|
||||
Thread.current.abort_on_exception = true
|
||||
run
|
||||
end
|
||||
end
|
||||
|
||||
def attach(io, stream)
|
||||
@todo << lambda do
|
||||
@map[io] = stream
|
||||
@nio.register(io, :r)
|
||||
end
|
||||
@nio.wakeup
|
||||
end
|
||||
|
||||
def detach(io, stream)
|
||||
@todo << lambda do
|
||||
@nio.deregister(io)
|
||||
@map.delete io
|
||||
end
|
||||
@nio.wakeup
|
||||
end
|
||||
|
||||
def stop
|
||||
@stopping = true
|
||||
@nio.wakeup
|
||||
end
|
||||
|
||||
def run
|
||||
loop do
|
||||
if @stopping
|
||||
@nio.close
|
||||
break
|
||||
end
|
||||
|
||||
until @todo.empty?
|
||||
@todo.pop(true).call
|
||||
end
|
||||
|
||||
if monitors = @nio.select
|
||||
monitors.each do |monitor|
|
||||
io = monitor.io
|
||||
stream = @map[io]
|
||||
|
||||
begin
|
||||
stream.receive io.read_nonblock(4096)
|
||||
rescue IO::WaitReadable
|
||||
next
|
||||
rescue EOFError
|
||||
stream.close
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
@ -1,13 +1,11 @@
|
||||
require 'faye/websocket'
|
||||
require 'websocket/driver'
|
||||
|
||||
module ActionCable
|
||||
module Connection
|
||||
# Decorate the Faye::WebSocket with helpers we need.
|
||||
# Wrap the real socket to minimize the externally-presented API
|
||||
class WebSocket
|
||||
delegate :rack_response, :close, :on, to: :websocket
|
||||
|
||||
def initialize(env)
|
||||
@websocket = Faye::WebSocket.websocket?(env) ? Faye::WebSocket.new(env) : nil
|
||||
def initialize(env, event_target, stream_event_loop)
|
||||
@websocket = ::WebSocket::Driver.websocket?(env) ? ClientSocket.new(env, event_target, stream_event_loop) : nil
|
||||
end
|
||||
|
||||
def possible?
|
||||
@ -15,11 +13,19 @@ def possible?
|
||||
end
|
||||
|
||||
def alive?
|
||||
websocket && websocket.ready_state == Faye::WebSocket::API::OPEN
|
||||
websocket && websocket.alive?
|
||||
end
|
||||
|
||||
def transmit(data)
|
||||
websocket.send data
|
||||
websocket.transmit data
|
||||
end
|
||||
|
||||
def close
|
||||
websocket.close
|
||||
end
|
||||
|
||||
def rack_response
|
||||
websocket.rack_response
|
||||
end
|
||||
|
||||
protected
|
||||
|
@ -32,6 +32,10 @@ def remote_connections
|
||||
@remote_connections ||= RemoteConnections.new(self)
|
||||
end
|
||||
|
||||
def stream_event_loop
|
||||
@stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new
|
||||
end
|
||||
|
||||
# The thread worker pool for handling all the connection work on this server. Default size is set by config.worker_pool_size.
|
||||
def worker_pool
|
||||
@worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size)
|
||||
|
@ -55,11 +55,11 @@ def send_async(method, *args)
|
||||
test "on connection open" do
|
||||
run_in_eventmachine do
|
||||
connection = open_connection
|
||||
connection.process
|
||||
|
||||
connection.websocket.expects(:transmit).with(regexp_matches(/\_ping/))
|
||||
connection.message_buffer.expects(:process!)
|
||||
|
||||
connection.process
|
||||
wait_for_async
|
||||
|
||||
assert_equal [ connection ], @server.connections
|
||||
@ -74,11 +74,11 @@ def send_async(method, *args)
|
||||
|
||||
# Setup the connection
|
||||
Concurrent::TimerTask.stubs(:new).returns(true)
|
||||
connection.send :on_open
|
||||
connection.send :handle_open
|
||||
assert connection.connected
|
||||
|
||||
connection.subscriptions.expects(:unsubscribe_from_all)
|
||||
connection.send :on_close
|
||||
connection.send :handle_close
|
||||
|
||||
assert ! connection.connected
|
||||
assert_equal [], @server.connections
|
||||
|
@ -68,10 +68,10 @@ def open_connection(server:)
|
||||
@connection = Connection.new(server, env)
|
||||
|
||||
@connection.process
|
||||
@connection.send :on_open
|
||||
@connection.send :handle_open
|
||||
end
|
||||
|
||||
def close_connection
|
||||
@connection.send :on_close
|
||||
@connection.send :handle_close
|
||||
end
|
||||
end
|
||||
|
@ -32,10 +32,10 @@ def open_connection(server:)
|
||||
@connection = Connection.new(server, env)
|
||||
|
||||
@connection.process
|
||||
@connection.send :on_open
|
||||
@connection.send :handle_open
|
||||
end
|
||||
|
||||
def close_connection
|
||||
@connection.send :on_close
|
||||
@connection.send :handle_close
|
||||
end
|
||||
end
|
||||
|
@ -14,6 +14,7 @@ def pubsub
|
||||
@config.subscription_adapter.new(self)
|
||||
end
|
||||
|
||||
def send_async
|
||||
def stream_event_loop
|
||||
@stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new
|
||||
end
|
||||
end
|
||||
|
Loading…
Reference in New Issue
Block a user