Merge pull request #51192 from Shopify/connection-leasing-2

Make `.connection` always return a permanently leased connection
This commit is contained in:
Jean Boussier 2024-03-01 11:50:17 +01:00 committed by GitHub
commit 75e3407917
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 178 additions and 45 deletions

@ -113,6 +113,90 @@ def db_config
# * private methods that require being called in a +synchronize+ blocks # * private methods that require being called in a +synchronize+ blocks
# are now explicitly documented # are now explicitly documented
class ConnectionPool class ConnectionPool
class Lease # :nodoc:
attr_accessor :connection, :sticky
def initialize
@connection = nil
@sticky = false
end
def release
conn = @connection
@connection = nil
@sticky = false
conn
end
def clear(connection)
if @connection == connection
@connection = nil
@sticky = false
true
else
false
end
end
end
if ObjectSpace.const_defined?(:WeakKeyMap) # RUBY_VERSION >= 3.3
WeakKeyMap = ::ObjectSpace::WeakKeyMap # :nodoc:
else
class WeakKeyMap # :nodoc:
def initialize
@map = ObjectSpace::WeakMap.new
@values = nil
@size = 0
end
alias_method :clear, :initialize
def [](key)
prune if @map.size != @size
@map[key]
end
def []=(key, value)
@map[key] = value
prune if @map.size != @size
value
end
def delete(key)
if value = self[key]
self[key] = nil
prune
end
value
end
private
def prune(force = false)
@values = @map.values
@size = @map.size
end
end
end
class LeaseRegistry # :nodoc:
def initialize
@mutex = Mutex.new
@map = WeakKeyMap.new
end
def [](context)
@mutex.synchronize do
@map[context] ||= Lease.new
end
end
def clear
@mutex.synchronize do
@map = WeakKeyMap.new
end
end
end
include MonitorMixin include MonitorMixin
prepend QueryCache::ConnectionPoolConfiguration prepend QueryCache::ConnectionPoolConfiguration
include ConnectionAdapters::AbstractPool include ConnectionAdapters::AbstractPool
@ -148,9 +232,9 @@ def initialize(pool_config)
# then that +thread+ does indeed own that +conn+. However, an absence of such # then that +thread+ does indeed own that +conn+. However, an absence of such
# mapping does not mean that the +thread+ doesn't own the said connection. In # mapping does not mean that the +thread+ doesn't own the said connection. In
# that case +conn.owner+ attr should be consulted. # that case +conn.owner+ attr should be consulted.
# Access and modification of <tt>@thread_cached_conns</tt> does not require # Access and modification of <tt>@leases</tt> does not require
# synchronization. # synchronization.
@thread_cached_conns = Concurrent::Map.new(initial_capacity: @size) @leases = LeaseRegistry.new
@connections = [] @connections = []
@automatic_reconnect = true @automatic_reconnect = true
@ -203,14 +287,18 @@ def internal_metadata # :nodoc:
# #
# #connection can be called any number of times; the connection is # #connection can be called any number of times; the connection is
# held in a cache keyed by a thread. # held in a cache keyed by a thread.
def connection def lease_connection
@thread_cached_conns[ActiveSupport::IsolatedExecutionState.context] ||= checkout lease = connection_lease
lease.sticky = true
lease.connection ||= checkout
end end
alias_method :connection, :lease_connection # TODO: deprecate
def pin_connection!(lock_thread) # :nodoc: def pin_connection!(lock_thread) # :nodoc:
raise "There is already a pinned connection" if @pinned_connection raise "There is already a pinned connection" if @pinned_connection
@pinned_connection = (@thread_cached_conns[ActiveSupport::IsolatedExecutionState.context] || checkout) @pinned_connection = (connection_lease&.connection || checkout)
# Any leased connection must be in @connections otherwise # Any leased connection must be in @connections otherwise
# some methods like #connected? won't behave correctly # some methods like #connected? won't behave correctly
unless @connections.include?(@pinned_connection) unless @connections.include?(@pinned_connection)
@ -252,7 +340,7 @@ def connection_class # :nodoc:
# #connection or #with_connection methods. Connections obtained through # #connection or #with_connection methods. Connections obtained through
# #checkout will not be detected by #active_connection? # #checkout will not be detected by #active_connection?
def active_connection? def active_connection?
@thread_cached_conns[ActiveSupport::IsolatedExecutionState.context] connection_lease.connection
end end
# Signal that the thread is finished with the current connection. # Signal that the thread is finished with the current connection.
@ -262,10 +350,12 @@ def active_connection?
# This method only works for connections that have been obtained through # This method only works for connections that have been obtained through
# #connection or #with_connection methods, connections obtained through # #connection or #with_connection methods, connections obtained through
# #checkout will not be automatically released. # #checkout will not be automatically released.
def release_connection(owner_thread = ActiveSupport::IsolatedExecutionState.context) def release_connection(existing_lease = nil)
if conn = @thread_cached_conns.delete(owner_thread) if conn = connection_lease.release
checkin conn checkin conn
return true
end end
false
end end
# Yields a connection from the connection pool to the block. If no connection # Yields a connection from the connection pool to the block. If no connection
@ -278,13 +368,14 @@ def release_connection(owner_thread = ActiveSupport::IsolatedExecutionState.cont
# connection will be properly returned to the pool by the code that checked # connection will be properly returned to the pool by the code that checked
# it out. # it out.
def with_connection def with_connection
if conn = @thread_cached_conns[ActiveSupport::IsolatedExecutionState.context] lease = connection_lease
yield conn if lease.connection
yield lease.connection
else else
begin begin
yield connection yield lease.connection = checkout
ensure ensure
release_connection release_connection(lease) unless lease.sticky
end end
end end
end end
@ -326,7 +417,7 @@ def disconnect(raise_on_acquisition_timeout = true)
conn.disconnect! conn.disconnect!
end end
@connections = [] @connections = []
@thread_cached_conns.clear @leases.clear
@available.clear @available.clear
end end
end end
@ -353,7 +444,7 @@ def discard! # :nodoc:
@connections.each do |conn| @connections.each do |conn|
conn.discard! conn.discard!
end end
@connections = @available = @thread_cached_conns = nil @connections = @available = @leases = nil
end end
end end
@ -436,7 +527,7 @@ def checkin(conn)
conn.lock.synchronize do conn.lock.synchronize do
synchronize do synchronize do
remove_connection_from_thread_cache conn connection_lease.clear(conn)
conn._run_checkin_callbacks do conn._run_checkin_callbacks do
conn.expire conn.expire
@ -560,6 +651,10 @@ def schedule_query(future_result) # :nodoc:
end end
private private
def connection_lease
@leases[ActiveSupport::IsolatedExecutionState.context]
end
def build_async_executor def build_async_executor
case ActiveRecord.async_query_executor case ActiveRecord.async_query_executor
when :multi_thread_pool when :multi_thread_pool
@ -734,17 +829,10 @@ def acquire_connection(checkout_timeout)
#-- #--
# if owner_thread param is omitted, this must be called in synchronize block # if owner_thread param is omitted, this must be called in synchronize block
def remove_connection_from_thread_cache(conn, owner_thread = conn.owner) def remove_connection_from_thread_cache(conn, owner_thread = conn.owner)
@thread_cached_conns.delete_pair(owner_thread, conn) @leases[owner_thread].clear(conn)
end end
alias_method :release, :remove_connection_from_thread_cache alias_method :release, :remove_connection_from_thread_cache
def prune_thread_cache
dead_threads = @thread_cached_conns.keys.reject(&:alive?)
dead_threads.each do |dead_thread|
@thread_cached_conns.delete(dead_thread)
end
end
def new_connection def new_connection
connection = db_config.new_connection connection = db_config.new_connection
connection.pool = self connection.pool = self
@ -788,6 +876,12 @@ def try_to_checkout_new_connection
def adopt_connection(conn) def adopt_connection(conn)
conn.pool = self conn.pool = self
@connections << conn @connections << conn
# We just created the first connection, it's time to load the schema
# cache if that wasn't eagerly done before
if @schema_cache.nil? && ActiveRecord.lazily_load_schema_cache
schema_cache.load!
end
end end
def checkout_new_connection def checkout_new_connection

@ -89,7 +89,7 @@ def initialize(...)
end end
end end
def checkout(...) def lease_connection
connection = super connection = super
connection.query_cache ||= query_cache connection.query_cache ||= query_cache
connection connection
@ -141,7 +141,6 @@ def clear_query_cache
private private
def prune_thread_cache def prune_thread_cache
super
dead_threads = @thread_query_caches.keys.reject(&:alive?) dead_threads = @thread_query_caches.keys.reject(&:alive?)
dead_threads.each do |dead_thread| dead_threads.each do |dead_thread|
@thread_query_caches.delete(dead_thread) @thread_query_caches.delete(dead_thread)

@ -49,10 +49,6 @@ def pool=(value)
return if value.eql?(@pool) return if value.eql?(@pool)
@schema_cache = nil @schema_cache = nil
@pool = value @pool = value
if @pool && ActiveRecord.lazily_load_schema_cache
@pool.schema_reflection.load!(@pool)
end
end end
set_callback :checkin, :after, :enable_lazy_transactions! set_callback :checkin, :after, :enable_lazy_transactions!

@ -250,8 +250,17 @@ def clear_query_caches_for_current_thread
# Returns the connection currently associated with the class. This can # Returns the connection currently associated with the class. This can
# also be used to "borrow" the connection to do database work unrelated # also be used to "borrow" the connection to do database work unrelated
# to any of the specific Active Records. # to any of the specific Active Records.
def connection # The connection will remain leased for the entire duration of the request
connection_pool.connection # or job, or until +#release_connection+ is called.
def lease_connection
connection_pool.lease_connection
end
alias_method :connection, :lease_connection
# Return the currently leased connection into the pool
def release_connection
connection.release_connection
end end
# Checkouts a connection from the pool, yield it and then check it back in. # Checkouts a connection from the pool, yield it and then check it back in.

@ -11,12 +11,28 @@ class ConnectionHandlingTest < ActiveRecord::TestCase
ActiveRecord::Base.with_connection do |connection| ActiveRecord::Base.with_connection do |connection|
assert_predicate ActiveRecord::Base.connection_pool, :active_connection? assert_predicate ActiveRecord::Base.connection_pool, :active_connection?
assert_same connection, ActiveRecord::Base.connection
end end
assert_not_predicate ActiveRecord::Base.connection_pool, :active_connection? assert_not_predicate ActiveRecord::Base.connection_pool, :active_connection?
end end
test "#connection makes the lease permanent even inside #with_connection" do
ActiveRecord::Base.connection_pool.release_connection
assert_not_predicate ActiveRecord::Base.connection_pool, :active_connection?
conn = nil
ActiveRecord::Base.with_connection do |connection|
conn = connection
assert_predicate ActiveRecord::Base.connection_pool, :active_connection?
2.times do
assert_same connection, ActiveRecord::Base.connection
end
end
assert_predicate ActiveRecord::Base.connection_pool, :active_connection?
assert_same conn, ActiveRecord::Base.connection
end
test "#with_connection use the already leased connection if available" do test "#with_connection use the already leased connection if available" do
leased_connection = ActiveRecord::Base.connection leased_connection = ActiveRecord::Base.connection
assert_predicate ActiveRecord::Base.connection_pool, :active_connection? assert_predicate ActiveRecord::Base.connection_pool, :active_connection?

@ -46,10 +46,6 @@ def teardown
ActiveSupport::IsolatedExecutionState.isolation_level = @previous_isolation_level ActiveSupport::IsolatedExecutionState.isolation_level = @previous_isolation_level
end end
def active_connections(pool)
pool.connections.find_all(&:in_use?)
end
def test_checkout_after_close def test_checkout_after_close
connection = pool.connection connection = pool.connection
assert_predicate connection, :in_use? assert_predicate connection, :in_use?
@ -90,6 +86,16 @@ def test_with_connection
assert_equal 2, active_connections(pool).size assert_equal 2, active_connections(pool).size
end end
assert_equal 1, active_connections(pool).size assert_equal 1, active_connections(pool).size
pool.with_connection do |conn|
assert conn
assert_equal 2, active_connections(pool).size
pool.connection # lease
end
assert_equal 2, active_connections(pool).size
pool.release_connection
assert_equal 1, active_connections(pool).size
}.join }.join
main_thread.close main_thread.close
@ -697,15 +703,24 @@ def test_disconnect_and_clear_reloadable_connections_attempt_to_wait_for_threads
def test_bang_versions_of_disconnect_and_clear_reloadable_connections_if_unable_to_acquire_all_connections_proceed_anyway def test_bang_versions_of_disconnect_and_clear_reloadable_connections_if_unable_to_acquire_all_connections_proceed_anyway
@pool.checkout_timeout = 0.001 # no need to delay test suite by waiting the whole full default timeout @pool.checkout_timeout = 0.001 # no need to delay test suite by waiting the whole full default timeout
[:disconnect!, :clear_reloadable_connections!].each do |group_action_method|
@pool.with_connection do |connection|
new_thread { @pool.send(group_action_method) }.join
# assert connection has been forcefully taken away from us
assert_not_predicate @pool, :active_connection?
# make a new connection for with_connection to clean up @pool.with_connection do |connection|
@pool.connection new_thread { @pool.disconnect! }.join
end # assert connection has been forcefully taken away from us
assert_not_predicate @pool, :active_connection?
# make a new connection for with_connection to clean up
@pool.connection
end
@pool.release_connection
@pool.with_connection do |connection|
new_thread { @pool.clear_reloadable_connections! }.join
# assert connection has been forcefully taken away from us
assert_not_predicate @pool, :active_connection?
# make a new connection for with_connection to clean up
@pool.connection
end end
end end
@ -920,6 +935,10 @@ def test_unpin_connection_returns_whether_transaction_has_been_rolledback
end end
private private
def active_connections(pool)
pool.connections.find_all(&:in_use?)
end
def with_single_connection_pool def with_single_connection_pool
config = @db_config.configuration_hash.merge(pool: 1) config = @db_config.configuration_hash.merge(pool: 1)
db_config = ActiveRecord::DatabaseConfigurations::HashConfig.new("arunit", "primary", config) db_config = ActiveRecord::DatabaseConfigurations::HashConfig.new("arunit", "primary", config)

@ -36,7 +36,7 @@ def test_pooled_connection_remove
old_connection = ActiveRecord::Base.connection old_connection = ActiveRecord::Base.connection
extra_connection = ActiveRecord::Base.connection_pool.checkout extra_connection = ActiveRecord::Base.connection_pool.checkout
ActiveRecord::Base.connection_pool.remove(extra_connection) ActiveRecord::Base.connection_pool.remove(extra_connection)
assert_equal ActiveRecord::Base.connection, old_connection assert_equal ActiveRecord::Base.connection.object_id, old_connection.object_id
end end
private private