Action Cable owns database connection, not Active Record

Before this commit, the database connection used in Action Cable's
PostgreSQL adapter was "owned" by `ActiveRecord::Base.connection_pool`.
This meant that if, for example, `#clear_reloadable_connections!` was called on the pool, Active
Record would "steal" the database connection from Action Cable, and
would cause all sorts of issues. This became evident during file
reloads; despite Action Cable trying its hardest to return its borrowed
database connection to Active Record via `@pubsub.shutdown`, Active Record calls
`#clear_reloadable_connections!` on the connection pool, and due to the order of callbacks, Active
Record's callback was being executed first. This meant that if you tried
to rerender a view after a file was reloaded, you would have to wait
through Active Record's timeout and such.

Now, Action Cable takes direct ownership of the database connection it
uses. It removes the connection from the pool to avoid the situation
described above. Action Cable also makes sure to call `#disconnect!` on
the connection when appropriate, to match the previous behavior of
Active Record.

[ Jon Moss & Matthew Draper]
This commit is contained in:
Jon Moss 2017-01-04 21:50:15 -05:00
parent d96dde82b7
commit 686f6a762f
2 changed files with 47 additions and 8 deletions

@ -11,7 +11,7 @@ def initialize(*)
end
def broadcast(channel, payload)
with_connection do |pg_conn|
with_broadcast_connection do |pg_conn|
pg_conn.exec("NOTIFY #{pg_conn.escape_identifier(channel)}, '#{pg_conn.escape_string(payload)}'")
end
end
@ -28,14 +28,24 @@ def shutdown
listener.shutdown
end
def with_connection(&block) # :nodoc:
def with_subscriptions_connection(&block) # :nodoc:
ar_conn = ActiveRecord::Base.connection_pool.checkout.tap do |conn|
# Action Cable is taking ownership over this database connection, and
# will perform the necessary cleanup tasks
ActiveRecord::Base.connection_pool.remove(conn)
end
pg_conn = ar_conn.raw_connection
verify!(pg_conn)
yield pg_conn
ensure
ar_conn.disconnect!
end
def with_broadcast_connection(&block) # :nodoc:
ActiveRecord::Base.connection_pool.with_connection do |ar_conn|
pg_conn = ar_conn.raw_connection
unless pg_conn.is_a?(PG::Connection)
raise "The Active Record database must be PostgreSQL in order to use the PostgreSQL Action Cable storage adapter"
end
verify!(pg_conn)
yield pg_conn
end
end
@ -45,6 +55,12 @@ def listener
@listener || @server.mutex.synchronize { @listener ||= Listener.new(self, @server.event_loop) }
end
def verify!(pg_conn)
unless pg_conn.is_a?(PG::Connection)
raise "The Active Record database must be PostgreSQL in order to use the PostgreSQL Action Cable storage adapter"
end
end
class Listener < SubscriberMap
def initialize(adapter, event_loop)
super()
@ -60,7 +76,7 @@ def initialize(adapter, event_loop)
end
def listen
@adapter.with_connection do |pg_conn|
@adapter.with_subscriptions_connection do |pg_conn|
catch :shutdown do
loop do
until @queue.empty?

@ -37,4 +37,27 @@ def teardown
def cable_config
{ adapter: "postgresql" }
end
def test_clear_active_record_connections_adapter_still_works
server = ActionCable::Server::Base.new
server.config.cable = cable_config.with_indifferent_access
server.config.logger = Logger.new(StringIO.new).tap { |l| l.level = Logger::UNKNOWN }
adapter_klass = Class.new(server.config.pubsub_adapter) do
def active?
!@listener.nil?
end
end
adapter = adapter_klass.new(server)
subscribe_as_queue("channel", adapter) do |queue|
adapter.broadcast("channel", "hello world")
assert_equal "hello world", queue.pop
end
ActiveRecord::Base.clear_reloadable_connections!
assert adapter.active?
end
end