Make AbstractAdapter#lock thread local by default

Fix: https://github.com/rails/rails/issues/45994

A semi-common issue since Ruby 3.0.2 is that using a fiber inside
a transaction cause a deadlock:

```ruby
Post.transaction do
 enum =  Enumerator.new do |y|
   y.yield Post.first # stuck
 end
 enum.next
end
```

This is because in https://bugs.ruby-lang.org/issues/17827
Ruby changed Monitor to be owned by the calling Fiber rather than Thread.

And since the Active Record connection pool is per Thread, we end
up in a situation where a Fiber tries to acquire a lock owned by another
fiber with no change to ever resolve.

In https://github.com/rails/rails/pull/46519 We've made that lock optional
as it's only needed for system tests.

Now this PR introduce an alternative lock implementation that behave like Monitor
used to up to Ruby 2.7, and we use this one if `ActiveSupport::IsolatedExecutionState.context`
is a Thread.

If it's a Fiber, we continue to use the implementation derived from the stdlib that is Fiber based.

Co-Authored-By: Maple Ong <maple.develops@gmail.com>
This commit is contained in:
Jean Boussier 2022-11-22 18:14:41 +01:00
parent d666a39240
commit 6253874326
5 changed files with 93 additions and 13 deletions

@ -175,7 +175,7 @@ def lock_thread=(lock_thread)
end
if (active_connection = @thread_cached_conns[connection_cache_key(current_thread)])
active_connection.synchronized = lock_thread
active_connection.lock_thread = @lock_thread
end
end
@ -357,7 +357,7 @@ def clear_reloadable_connections!
# - ActiveRecord::ConnectionTimeoutError no connection can be obtained from the pool.
def checkout(checkout_timeout = @checkout_timeout)
connection = checkout_and_verify(acquire_connection(checkout_timeout))
connection.synchronized = @lock_thread
connection.lock_thread = @lock_thread
connection
end
@ -375,6 +375,7 @@ def checkin(conn)
conn.expire
end
conn.lock_thread = nil
@available.add conn
end
end

@ -156,7 +156,7 @@ def initialize(config_or_deprecated_connection, deprecated_logger = nil, depreca
@idle_since = Process.clock_gettime(Process::CLOCK_MONOTONIC)
@visitor = arel_visitor
@statements = build_statement_pool
self.synchronized = false
self.lock_thread = nil
@prepared_statements = self.class.type_cast_config_to_boolean(
@config.fetch(:prepared_statements) { default_prepared_statements }
@ -172,8 +172,12 @@ def initialize(config_or_deprecated_connection, deprecated_logger = nil, depreca
@verified = false
end
def synchronized=(synchronized) # :nodoc:
@lock = if synchronized
def lock_thread=(lock_thread) # :nodoc:
@lock =
case lock_thread
when Thread
ActiveSupport::Concurrency::ThreadLoadInterlockAwareMonitor.new
when Fiber
ActiveSupport::Concurrency::LoadInterlockAwareMonitor.new
else
ActiveSupport::Concurrency::NullLock

@ -801,6 +801,19 @@ def setup
@connection_test_model_class = ThreadConnectionTestModel
end
def test_lock_thread_allow_fiber_reentrency
@pool.lock_thread = true
connection = @pool.checkout
connection.transaction do
enumerator = Enumerator.new do |yielder|
connection.transaction do
yielder.yield 1
end
end
assert_equal 1, enumerator.next
end
end
private
def new_thread(...)
Thread.new(...)

@ -4,9 +4,7 @@
module ActiveSupport
module Concurrency
# A monitor that will permit dependency loading while blocked waiting for
# the lock.
class LoadInterlockAwareMonitor < Monitor
module LoadInterlockAwareMonitorMixin # :nodoc:
EXCEPTION_NEVER = { Exception => :never }.freeze
EXCEPTION_IMMEDIATE = { Exception => :immediate }.freeze
private_constant :EXCEPTION_NEVER, :EXCEPTION_IMMEDIATE
@ -29,5 +27,46 @@ def synchronize(&block)
end
end
end
# A monitor that will permit dependency loading while blocked waiting for
# the lock.
class LoadInterlockAwareMonitor < Monitor
include LoadInterlockAwareMonitorMixin
end
class ThreadLoadInterlockAwareMonitor # :nodoc:
prepend LoadInterlockAwareMonitorMixin
def initialize
@owner = nil
@count = 0
@mutex = Mutex.new
end
private
def mon_try_enter
if @owner != Thread.current
return false unless @mutex.try_lock
@owner = Thread.current
end
@count += 1
end
def mon_enter
@mutex.lock if @owner != Thread.current
@owner = Thread.current
@count += 1
end
def mon_exit
unless @owner == Thread.current
raise ThreadError, "current thread not owner"
end
@count -= 1
return unless @count == 0
@owner = nil
@mutex.unlock
end
end
end
end

@ -6,11 +6,7 @@
module ActiveSupport
module Concurrency
class LoadInterlockAwareMonitorTest < ActiveSupport::TestCase
def setup
@monitor = ActiveSupport::Concurrency::LoadInterlockAwareMonitor.new
end
module LoadInterlockAwareMonitorTests
def test_entering_with_no_blocking
assert @monitor.mon_enter
end
@ -51,5 +47,32 @@ def test_entering_with_blocking
assert able_to_load
end
end
class LoadInterlockAwareMonitorTest < ActiveSupport::TestCase
include LoadInterlockAwareMonitorTests
def setup
@monitor = ActiveSupport::Concurrency::LoadInterlockAwareMonitor.new
end
end
class ThreadLoadInterlockAwareMonitorTest < ActiveSupport::TestCase
include LoadInterlockAwareMonitorTests
def setup
@monitor = ActiveSupport::Concurrency::ThreadLoadInterlockAwareMonitor.new
end
def test_lock_owned_by_thread
@monitor.synchronize do
enumerator = Enumerator.new do |yielder|
@monitor.synchronize do
yielder.yield 42
end
end
assert_equal 42, enumerator.next
end
end
end
end
end