Support faye-websocket + EventMachine as an option

This commit is contained in:
Matthew Draper 2016-03-02 11:20:19 +10:30
parent 541e4abb4b
commit a373be9da4
31 changed files with 229 additions and 52 deletions

@ -17,6 +17,7 @@ env:
- "GEM=railties"
- "GEM=ap"
- "GEM=ac"
- "GEM=ac FAYE=1"
- "GEM=am,amo,as,av,aj"
- "GEM=ar:mysql2"
- "GEM=ar:sqlite3"

@ -131,7 +131,7 @@ GEM
erubis (2.7.0)
eventmachine (1.0.9.1)
execjs (2.6.0)
faye-websocket (0.10.2)
faye-websocket (0.10.3)
eventmachine (>= 0.12.0)
websocket-driver (>= 0.5.1)
ffi (1.9.10)

@ -27,7 +27,7 @@ def active_periodic_timers
def start_periodic_timers
self.class.periodic_timers.each do |callback, options|
active_periodic_timers << Concurrent::TimerTask.new(execution_interval: options[:every]) do
active_periodic_timers << connection.server.event_loop.timer(options[:every]) do
connection.worker_pool.async_run_periodic_timer(self, callback)
end
end

@ -79,7 +79,7 @@ def stream_from(broadcasting, callback = nil)
callback ||= default_stream_callback(broadcasting)
streams << [ broadcasting, callback ]
Concurrent.global_io_executor.post do
connection.server.event_loop.post do
pubsub.subscribe(broadcasting, callback, lambda do
transmit_subscription_confirmation
logger.info "#{self.class.name} is streaming from #{broadcasting}"

@ -8,6 +8,8 @@ module Connection
autoload :ClientSocket
autoload :Identification
autoload :InternalChannel
autoload :FayeClientSocket
autoload :FayeEventLoop
autoload :MessageBuffer
autoload :Stream
autoload :StreamEventLoop

@ -49,7 +49,7 @@ class Base
include Authorization
attr_reader :server, :env, :subscriptions, :logger, :worker_pool
delegate :stream_event_loop, :pubsub, to: :server
delegate :event_loop, :pubsub, to: :server
def initialize(server, env)
@server, @env = server, env
@ -57,7 +57,7 @@ def initialize(server, env)
@worker_pool = server.worker_pool
@logger = new_tagged_logger
@websocket = ActionCable::Connection::WebSocket.new(env, self, stream_event_loop)
@websocket = ActionCable::Connection::WebSocket.new(env, self, event_loop, server.config.client_socket_class)
@subscriptions = ActionCable::Connection::Subscriptions.new(self)
@message_buffer = ActionCable::Connection::MessageBuffer.new(self)

@ -29,10 +29,10 @@ def self.secure_request?(env)
attr_reader :env, :url
def initialize(env, event_target, stream_event_loop)
@env = env
@event_target = event_target
@stream_event_loop = stream_event_loop
def initialize(env, event_target, event_loop)
@env = env
@event_target = event_target
@event_loop = event_loop
@url = ClientSocket.determine_url(@env)
@ -49,7 +49,7 @@ def initialize(env, event_target, stream_event_loop)
@driver.on(:close) { |e| begin_close(e.reason, e.code) }
@driver.on(:error) { |e| emit_error(e.message) }
@stream = ActionCable::Connection::Stream.new(@stream_event_loop, self)
@stream = ActionCable::Connection::Stream.new(@event_loop, self)
end
def start_driver

@ -0,0 +1,42 @@
require 'faye/websocket'
module ActionCable
module Connection
class FayeClientSocket
def initialize(env, event_target, stream_event_loop)
@env = env
@event_target = event_target
@faye = nil
end
def alive?
@faye && @faye.ready_state == Faye::WebSocket::API::OPEN
end
def transmit(data)
connect
@faye.send data
end
def close
@faye && @faye.close
end
def rack_response
connect
@faye.rack_response
end
private
def connect
return if @faye
@faye = Faye::WebSocket.new(@env)
@faye.on(:open) { |event| @event_target.on_open }
@faye.on(:message) { |event| @event_target.on_message(event.data) }
@faye.on(:close) { |event| @event_target.on_close(event.reason, event.code) }
end
end
end
end

@ -0,0 +1,44 @@
require 'thread'
require 'eventmachine'
EventMachine.epoll if EventMachine.epoll?
EventMachine.kqueue if EventMachine.kqueue?
module ActionCable
module Connection
class FayeEventLoop
@@mutex = Mutex.new
def timer(interval, &block)
ensure_reactor_running
EMTimer.new(::EM::PeriodicTimer.new(interval, &block))
end
def post(task = nil, &block)
task ||= block
ensure_reactor_running
::EM.next_tick(&task)
end
private
def ensure_reactor_running
return if EventMachine.reactor_running?
@@mutex.synchronize do
Thread.new { EventMachine.run } unless EventMachine.reactor_running?
Thread.pass until EventMachine.reactor_running?
end
end
class EMTimer
def initialize(inner)
@inner = inner
end
def shutdown
inner.cancel
end
end
end
end
end

@ -15,14 +15,14 @@ def subscribe_to_internal_channel
@_internal_subscriptions ||= []
@_internal_subscriptions << [ internal_channel, callback ]
Concurrent.global_io_executor.post { pubsub.subscribe(internal_channel, callback) }
server.event_loop.post { pubsub.subscribe(internal_channel, callback) }
logger.info "Registered connection (#{connection_identifier})"
end
end
def unsubscribe_from_internal_channel
if @_internal_subscriptions.present?
@_internal_subscriptions.each { |channel, callback| Concurrent.global_io_executor.post { pubsub.unsubscribe(channel, callback) } }
@_internal_subscriptions.each { |channel, callback| server.event_loop.post { pubsub.unsubscribe(channel, callback) } }
end
end

@ -11,7 +11,16 @@ def initialize
@todo = Queue.new
@spawn_mutex = Mutex.new
spawn
end
def timer(interval, &block)
Concurrent::TimerTask.new(execution_interval: interval, &block).tap(&:execute)
end
def post(task = nil, &block)
task ||= block
Concurrent.global_io_executor << task
end
def attach(io, stream)

@ -4,8 +4,8 @@ module ActionCable
module Connection
# Wrap the real socket to minimize the externally-presented API
class WebSocket
def initialize(env, event_target, stream_event_loop)
@websocket = ::WebSocket::Driver.websocket?(env) ? ClientSocket.new(env, event_target, stream_event_loop) : nil
def initialize(env, event_target, event_loop, client_socket_class)
@websocket = ::WebSocket::Driver.websocket?(env) ? client_socket_class.new(env, event_target, event_loop) : nil
end
def possible?

@ -1,4 +1,4 @@
require 'thread'
require 'monitor'
module ActionCable
module Server
@ -18,8 +18,8 @@ def self.logger; config.logger; end
attr_reader :mutex
def initialize
@mutex = Mutex.new
@remote_connections = @stream_event_loop = @worker_pool = @channel_classes = @pubsub = nil
@mutex = Monitor.new
@remote_connections = @event_loop = @worker_pool = @channel_classes = @pubsub = nil
end
# Called by Rack to setup the server.
@ -48,8 +48,8 @@ def remote_connections
@remote_connections || @mutex.synchronize { @remote_connections ||= RemoteConnections.new(self) }
end
def stream_event_loop
@stream_event_loop || @mutex.synchronize { @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new }
def event_loop
@event_loop || @mutex.synchronize { @event_loop ||= config.event_loop_class.new }
end
# The thread worker pool for handling all the connection work on this server. Default size is set by config.worker_pool_size.

@ -4,7 +4,7 @@ module Server
# in a Rails config initializer.
class Configuration
attr_accessor :logger, :log_tags
attr_accessor :connection_class, :worker_pool_size
attr_accessor :use_faye, :connection_class, :worker_pool_size
attr_accessor :disable_request_forgery_protection, :allowed_request_origins
attr_accessor :cable, :url, :mount_path
@ -43,6 +43,22 @@ def pubsub_adapter
adapter = 'PostgreSQL' if adapter == 'Postgresql'
"ActionCable::SubscriptionAdapter::#{adapter}".constantize
end
def event_loop_class
if use_faye
ActionCable::Connection::FayeEventLoop
else
ActionCable::Connection::StreamEventLoop
end
end
def client_socket_class
if use_faye
ActionCable::Connection::FayeClientSocket
else
ActionCable::Connection::ClientSocket
end
end
end
end
end

@ -21,9 +21,9 @@ def remove_connection(connection)
# then can't rely on being able to communicate with the connection. To solve this, a 3 second heartbeat runs on all connections. If the beat fails, we automatically
# disconnect.
def setup_heartbeat_timer
@heartbeat_timer ||= Concurrent::TimerTask.new(execution_interval: BEAT_INTERVAL) do
Concurrent.global_io_executor.post { connections.map(&:beat) }
end.tap(&:execute)
@heartbeat_timer ||= event_loop.timer(BEAT_INTERVAL) do
event_loop.post { connections.map(&:beat) }
end
end
def open_connections_statistics

@ -5,16 +5,21 @@ module SubscriptionAdapter
class Async < Inline # :nodoc:
private
def new_subscriber_map
AsyncSubscriberMap.new
AsyncSubscriberMap.new(server.event_loop)
end
class AsyncSubscriberMap < SubscriberMap
def initialize(event_loop)
@event_loop = event_loop
super()
end
def add_subscriber(*)
Concurrent.global_io_executor.post { super }
@event_loop.post { super }
end
def invoke_callback(*)
Concurrent.global_io_executor.post { super }
@event_loop.post { super }
end
end
end

@ -42,14 +42,15 @@ def with_connection(&block) # :nodoc:
private
def listener
@listener || @server.mutex.synchronize { @listener ||= Listener.new(self) }
@listener || @server.mutex.synchronize { @listener ||= Listener.new(self, @server.event_loop) }
end
class Listener < SubscriberMap
def initialize(adapter)
def initialize(adapter, event_loop)
super()
@adapter = adapter
@event_loop = event_loop
@queue = Queue.new
@thread = Thread.new do
@ -68,7 +69,7 @@ def listen
case action
when :listen
pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}")
Concurrent.global_io_executor << callback if callback
@event_loop.post(&callback) if callback
when :unlisten
pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}")
when :shutdown
@ -98,7 +99,7 @@ def remove_channel(channel)
end
def invoke_callback(*)
Concurrent.global_io_executor.post { super }
@event_loop.post { super }
end
end
end

@ -38,7 +38,7 @@ def redis_connection_for_subscriptions
private
def listener
@listener || @server.mutex.synchronize { @listener ||= Listener.new(self) }
@listener || @server.mutex.synchronize { @listener ||= Listener.new(self, @server.event_loop) }
end
def redis_connection_for_broadcasts
@ -52,10 +52,11 @@ def redis_connection
end
class Listener < SubscriberMap
def initialize(adapter)
def initialize(adapter, event_loop)
super()
@adapter = adapter
@event_loop = event_loop
@subscribe_callbacks = Hash.new { |h, k| h[k] = [] }
@subscription_lock = Mutex.new
@ -84,7 +85,7 @@ def listen(conn)
if callbacks = @subscribe_callbacks[chan]
next_callback = callbacks.shift
Concurrent.global_io_executor << next_callback if next_callback
@event_loop.post(&next_callback) if next_callback
@subscribe_callbacks.delete(chan) if callbacks.empty?
end
end
@ -133,7 +134,7 @@ def remove_channel(channel)
end
def invoke_callback(*)
Concurrent.global_io_executor.post { super }
@event_loop.post { super }
end
private

@ -31,7 +31,7 @@ def ping
end
test "timer start and stop" do
Concurrent::TimerTask.expects(:new).times(2).returns(true)
@connection.server.event_loop.expects(:timer).times(2).returns(true)
channel = ChatChannel.new @connection, "{id: 1}", { id: 1 }
channel.expects(:stop_periodic_timers).once

@ -8,8 +8,8 @@
require 'json'
class ClientTest < ActionCable::TestCase
WAIT_WHEN_EXPECTING_EVENT = 3
WAIT_WHEN_NOT_EXPECTING_EVENT = 0.2
WAIT_WHEN_EXPECTING_EVENT = 8
WAIT_WHEN_NOT_EXPECTING_EVENT = 0.5
def setup
ActionCable.instance_variable_set(:@server, nil)
@ -17,6 +17,7 @@ def setup
server.config.logger = Logger.new(StringIO.new).tap { |l| l.level = Logger::UNKNOWN }
server.config.cable = { adapter: 'async' }.with_indifferent_access
server.config.use_faye = ENV['FAYE'].present?
# and now the "real" setup for our test:
server.config.disable_request_forgery_protection = true

@ -20,7 +20,7 @@ def send_async(method, *args)
server.config.allowed_request_origins = %w( http://rubyonrails.com )
env = Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket',
'HTTP_ORIGIN' => 'http://rubyonrails.com'
'HTTP_HOST' => 'localhost', 'HTTP_ORIGIN' => 'http://rubyonrails.com'
connection = Connection.new(server, env)
connection.websocket.expects(:close)

@ -1,5 +1,6 @@
require 'test_helper'
require 'stubs/test_server'
require 'active_support/core_ext/object/json'
class ActionCable::Connection::BaseTest < ActionCable::TestCase
class Connection < ActionCable::Connection::Base
@ -73,7 +74,7 @@ def send_async(method, *args)
connection.process
# Setup the connection
Concurrent::TimerTask.stubs(:new).returns(true)
connection.server.stubs(:timer).returns(true)
connection.send :handle_open
assert connection.connected
@ -119,7 +120,7 @@ def call(*)
env = Rack::MockRequest.env_for(
"/test",
{ 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket',
'HTTP_ORIGIN' => 'http://rubyonrails.org', 'rack.hijack' => CallMeMaybe.new }
'HTTP_HOST' => 'localhost', 'HTTP_ORIGIN' => 'http://rubyonrails.org', 'rack.hijack' => CallMeMaybe.new }
)
connection = ActionCable::Connection::Base.new(@server, env)
@ -131,7 +132,7 @@ def call(*)
private
def open_connection
env = Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket',
'HTTP_ORIGIN' => 'http://rubyonrails.com'
'HTTP_HOST' => 'localhost', 'HTTP_ORIGIN' => 'http://rubyonrails.com'
Connection.new(@server, env)
end

@ -76,6 +76,6 @@ def connect_with_origin(origin)
def env_for_origin(origin)
Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket', 'SERVER_NAME' => HOST,
'HTTP_ORIGIN' => origin
'HTTP_HOST' => HOST, 'HTTP_ORIGIN' => origin
end
end

@ -64,7 +64,7 @@ def open_connection_with_stubbed_pubsub
end
def open_connection(server:)
env = Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket'
env = Rack::MockRequest.env_for "/test", 'HTTP_HOST' => 'localhost', 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket'
@connection = Connection.new(server, env)
@connection.process

@ -28,7 +28,7 @@ def open_connection_with_stubbed_pubsub
end
def open_connection(server:)
env = Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket'
env = Rack::MockRequest.env_for "/test", 'HTTP_HOST' => 'localhost', 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket'
@connection = Connection.new(server, env)
@connection.process

@ -30,7 +30,7 @@ def open_connection_with_stubbed_pubsub
end
def open_connection
env = Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket'
env = Rack::MockRequest.env_for "/test", 'HTTP_HOST' => 'localhost', 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket'
@connection = Connection.new(@server, env)
@connection.process

@ -107,7 +107,7 @@ def subscribe_to_chat_channel(identifier = @chat_identifier)
end
def setup_connection
env = Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket'
env = Rack::MockRequest.env_for "/test", 'HTTP_HOST' => 'localhost', 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket'
@connection = Connection.new(@server, env)
@subscriptions = ActionCable::Connection::Subscriptions.new(@connection)

@ -1,18 +1,19 @@
require 'stubs/user'
class TestConnection
attr_reader :identifiers, :logger, :current_user, :transmissions
attr_reader :identifiers, :logger, :current_user, :server, :transmissions
def initialize(user = User.new("lifo"))
@identifiers = [ :current_user ]
@current_user = user
@logger = ActiveSupport::TaggedLogging.new ActiveSupport::Logger.new(StringIO.new)
@server = TestServer.new
@transmissions = []
end
def pubsub
SuccessAdapter.new(TestServer.new)
SuccessAdapter.new(server)
end
def transmit(data)

@ -8,14 +8,24 @@ class TestServer
def initialize
@logger = ActiveSupport::TaggedLogging.new ActiveSupport::Logger.new(StringIO.new)
@config = OpenStruct.new(log_tags: [], subscription_adapter: SuccessAdapter)
@config.use_faye = ENV['FAYE'].present?
@config.client_socket_class = if @config.use_faye
ActionCable::Connection::FayeClientSocket
else
ActionCable::Connection::ClientSocket
end
end
def pubsub
@config.subscription_adapter.new(self)
end
def stream_event_loop
@stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new
def event_loop
@event_loop ||= if @config.use_faye
ActionCable::Connection::FayeEventLoop.new
else
ActionCable::Connection::StreamEventLoop.new
end
end
def worker_pool

@ -11,6 +11,7 @@ module CommonSubscriptionAdapterTest
def setup
server = ActionCable::Server::Base.new
server.config.cable = cable_config.with_indifferent_access
server.config.use_faye = ENV['FAYE'].present?
adapter_klass = server.config.pubsub_adapter

@ -13,7 +13,41 @@
# Require all the stubs and models
Dir[File.dirname(__FILE__) + '/stubs/*.rb'].each {|file| require file }
class ActionCable::TestCase < ActiveSupport::TestCase
if ENV['FAYE'].present?
require 'faye/websocket'
class << Faye::WebSocket
remove_method :ensure_reactor_running
# We don't want Faye to start the EM reactor in tests because it makes testing much harder.
# We want to be able to start and stop EM loop in tests to make things simpler.
def ensure_reactor_running
# no-op
end
end
end
module EventMachineConcurrencyHelpers
def wait_for_async
EM.run_deferred_callbacks
end
def run_in_eventmachine
failure = nil
EM.run do
begin
yield
rescue => ex
failure = ex
ensure
wait_for_async
EM.stop if EM.reactor_running?
end
end
raise failure if failure
end
end
module ConcurrentRubyConcurrencyHelpers
def wait_for_async
e = Concurrent.global_io_executor
until e.completed_task_count == e.scheduled_task_count
@ -26,3 +60,11 @@ def run_in_eventmachine
wait_for_async
end
end
class ActionCable::TestCase < ActiveSupport::TestCase
if ENV['FAYE'].present?
include EventMachineConcurrencyHelpers
else
include ConcurrentRubyConcurrencyHelpers
end
end