Merge pull request #50999 from Shopify/refactor-transactional-fixtures

Decouple transactional fixtures and active connections
This commit is contained in:
Jean Boussier 2024-02-12 09:54:39 +01:00 committed by GitHub
commit a8d6d477c7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 197 additions and 109 deletions

@ -162,8 +162,7 @@ def initialize(pool_config)
@threads_blocking_new_connections = 0 @threads_blocking_new_connections = 0
@available = ConnectionLeasingQueue.new self @available = ConnectionLeasingQueue.new self
@pinned_connection = nil
@lock_thread = false
@async_executor = build_async_executor @async_executor = build_async_executor
@ -171,25 +170,48 @@ def initialize(pool_config)
@reaper.run @reaper.run
end end
def lock_thread=(lock_thread)
if lock_thread
@lock_thread = ActiveSupport::IsolatedExecutionState.context
else
@lock_thread = nil
end
if (active_connection = @thread_cached_conns[connection_cache_key(current_thread)])
active_connection.lock_thread = @lock_thread
end
end
# Retrieve the connection associated with the current thread, or call # Retrieve the connection associated with the current thread, or call
# #checkout to obtain one if necessary. # #checkout to obtain one if necessary.
# #
# #connection can be called any number of times; the connection is # #connection can be called any number of times; the connection is
# held in a cache keyed by a thread. # held in a cache keyed by a thread.
def connection def connection
@thread_cached_conns[connection_cache_key(current_thread)] ||= checkout @thread_cached_conns[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)] ||= checkout
end
def pin_connection!(lock_thread) # :nodoc:
raise "There is already a pinned connection" if @pinned_connection
@pinned_connection = (@thread_cached_conns[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)] || checkout)
# Any leased connection must be in @connections otherwise
# some methods like #connected? won't behave correctly
unless @connections.include?(@pinned_connection)
@connections << @pinned_connection
end
@pinned_connection.lock_thread = ActiveSupport::IsolatedExecutionState.context if lock_thread
@pinned_connection.verify! # eagerly validate the connection
@pinned_connection.begin_transaction joinable: false, _lazy: false
end
def unpin_connection! # :nodoc:
raise "There isn't a pinned connection #{object_id}" unless @pinned_connection
clean = true
@pinned_connection.lock.synchronize do
connection, @pinned_connection = @pinned_connection, nil
if connection.transaction_open?
connection.rollback_transaction
else
# Something committed or rolled back the transaction
clean = false
connection.reset!
end
connection.lock_thread = nil
checkin(connection)
end
clean
end end
def connection_class # :nodoc: def connection_class # :nodoc:
@ -204,7 +226,7 @@ def connection_class # :nodoc:
# #connection or #with_connection methods. Connections obtained through # #connection or #with_connection methods. Connections obtained through
# #checkout will not be detected by #active_connection? # #checkout will not be detected by #active_connection?
def active_connection? def active_connection?
@thread_cached_conns[connection_cache_key(current_thread)] @thread_cached_conns[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)]
end end
# Signal that the thread is finished with the current connection. # Signal that the thread is finished with the current connection.
@ -276,6 +298,7 @@ def disconnect(raise_on_acquisition_timeout = true)
conn.disconnect! conn.disconnect!
end end
@connections = [] @connections = []
@thread_cached_conns.clear
@available.clear @available.clear
end end
end end
@ -360,9 +383,19 @@ def clear_reloadable_connections!
# Raises: # Raises:
# - ActiveRecord::ConnectionTimeoutError no connection can be obtained from the pool. # - ActiveRecord::ConnectionTimeoutError no connection can be obtained from the pool.
def checkout(checkout_timeout = @checkout_timeout) def checkout(checkout_timeout = @checkout_timeout)
connection = checkout_and_verify(acquire_connection(checkout_timeout)) if @pinned_connection
connection.lock_thread = @lock_thread synchronize do
connection @pinned_connection.verify!
# Any leased connection must be in @connections otherwise
# some methods like #connected? won't behave correctly
unless @connections.include?(@pinned_connection)
@connections << @pinned_connection
end
end
@pinned_connection
else
checkout_and_verify(acquire_connection(checkout_timeout))
end
end end
# Check-in a database connection back into the pool, indicating that you # Check-in a database connection back into the pool, indicating that you
@ -371,6 +404,8 @@ def checkout(checkout_timeout = @checkout_timeout)
# +conn+: an AbstractAdapter object, which was obtained by earlier by # +conn+: an AbstractAdapter object, which was obtained by earlier by
# calling #checkout on this pool. # calling #checkout on this pool.
def checkin(conn) def checkin(conn)
return if @pinned_connection.equal?(conn)
conn.lock.synchronize do conn.lock.synchronize do
synchronize do synchronize do
remove_connection_from_thread_cache conn remove_connection_from_thread_cache conn
@ -379,7 +414,6 @@ def checkin(conn)
conn.expire conn.expire
end end
conn.lock_thread = nil
@available.add conn @available.add conn
end end
end end
@ -533,10 +567,6 @@ def connection_cache_key(thread)
thread thread
end end
def current_thread
@lock_thread || ActiveSupport::IsolatedExecutionState.context
end
# Take control of all existing connections so a "group" action such as # Take control of all existing connections so a "group" action such as
# reload/disconnect can be performed safely. It is no longer enough to # reload/disconnect can be performed safely. It is no longer enough to
# wrap it in +synchronize+ because some pool's actions are allowed # wrap it in +synchronize+ because some pool's actions are allowed

@ -36,17 +36,17 @@ def initialize(*)
end end
def enable_query_cache! def enable_query_cache!
@query_cache_enabled[connection_cache_key(current_thread)] = true @query_cache_enabled[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)] = true
connection.enable_query_cache! if active_connection? connection.enable_query_cache! if active_connection?
end end
def disable_query_cache! def disable_query_cache!
@query_cache_enabled.delete connection_cache_key(current_thread) @query_cache_enabled.delete connection_cache_key(ActiveSupport::IsolatedExecutionState.context)
connection.disable_query_cache! if active_connection? connection.disable_query_cache! if active_connection?
end end
def query_cache_enabled def query_cache_enabled
@query_cache_enabled[connection_cache_key(current_thread)] @query_cache_enabled[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)]
end end
end end

@ -174,19 +174,13 @@ def initialize(config_or_deprecated_connection, deprecated_logger = nil, depreca
@verified = false @verified = false
end end
THREAD_LOCK = ActiveSupport::Concurrency::ThreadLoadInterlockAwareMonitor.new
private_constant :THREAD_LOCK
FIBER_LOCK = ActiveSupport::Concurrency::LoadInterlockAwareMonitor.new
private_constant :FIBER_LOCK
def lock_thread=(lock_thread) # :nodoc: def lock_thread=(lock_thread) # :nodoc:
@lock = @lock =
case lock_thread case lock_thread
when Thread when Thread
THREAD_LOCK ActiveSupport::Concurrency::ThreadLoadInterlockAwareMonitor.new
when Fiber when Fiber
FIBER_LOCK ActiveSupport::Concurrency::LoadInterlockAwareMonitor.new
else else
ActiveSupport::Concurrency::NullLock ActiveSupport::Concurrency::NullLock
end end

@ -131,8 +131,8 @@ def setup_fixtures(config = ActiveRecord::Base)
end end
@fixture_cache = {} @fixture_cache = {}
@fixture_connections = {}
@fixture_cache_key = [self.class.fixture_table_names.dup, self.class.fixture_paths.dup, self.class.fixture_class_names.dup] @fixture_cache_key = [self.class.fixture_table_names.dup, self.class.fixture_paths.dup, self.class.fixture_class_names.dup]
@fixture_connection_pools = []
@@already_loaded_fixtures ||= {} @@already_loaded_fixtures ||= {}
@connection_subscriber = nil @connection_subscriber = nil
@saved_pool_configs = Hash.new { |hash, key| hash[key] = {} } @saved_pool_configs = Hash.new { |hash, key| hash[key] = {} }
@ -163,20 +163,24 @@ def teardown_fixtures
teardown_transactional_fixtures teardown_transactional_fixtures
else else
ActiveRecord::FixtureSet.reset_cache ActiveRecord::FixtureSet.reset_cache
invalidate_already_loaded_fixtures
end end
ActiveRecord::Base.connection_handler.clear_active_connections!(:all) ActiveRecord::Base.connection_handler.clear_active_connections!(:all)
end end
def invalidate_already_loaded_fixtures
@@already_loaded_fixtures.clear
end
def setup_transactional_fixtures def setup_transactional_fixtures
setup_shared_connection_pool setup_shared_connection_pool
# Begin transactions for connections already established # Begin transactions for connections already established
@fixture_connections = ActiveRecord::Base.connection_handler.connection_pool_list(:writing).to_h do |pool| @fixture_connection_pools = ActiveRecord::Base.connection_handler.connection_pool_list(:writing)
pool.lock_thread = true if lock_threads @fixture_connection_pools.each do |pool|
connection = pool.connection pool.pin_connection!(lock_threads)
transaction = connection.begin_transaction(joinable: false, _lazy: false) pool.connection
[connection, transaction]
end end
# When connections are established in the future, begin a transaction too # When connections are established in the future, begin a transaction too
@ -185,19 +189,14 @@ def setup_transactional_fixtures
shard = payload[:shard] if payload.key?(:shard) shard = payload[:shard] if payload.key?(:shard)
if connection_name if connection_name
begin pool = ActiveRecord::Base.connection_handler.retrieve_connection_pool(connection_name, shard: shard)
connection = ActiveRecord::Base.connection_handler.retrieve_connection(connection_name, shard: shard) if pool
connection.connect! # eagerly validate the connection
rescue ConnectionNotEstablished
connection = nil
end
if connection
setup_shared_connection_pool setup_shared_connection_pool
if !@fixture_connections.key?(connection) unless @fixture_connection_pools.include?(pool)
connection.pool.lock_thread = true if lock_threads pool.pin_connection!(lock_threads)
@fixture_connections[connection] = connection.begin_transaction(joinable: false, _lazy: false) pool.connection
@fixture_connection_pools << pool
end end
end end
end end
@ -206,27 +205,15 @@ def setup_transactional_fixtures
def teardown_transactional_fixtures def teardown_transactional_fixtures
ActiveSupport::Notifications.unsubscribe(@connection_subscriber) if @connection_subscriber ActiveSupport::Notifications.unsubscribe(@connection_subscriber) if @connection_subscriber
@fixture_connections.each do |connection, transaction| unless @fixture_connection_pools.map(&:unpin_connection!).all?
begin # Something caused the transaction to be committed or rolled back
connection.rollback_transaction(transaction) # We can no longer trust the database is in a clean state.
rescue ActiveRecord::StatementInvalid @@already_loaded_fixtures.clear
# Something commited or rolled back the transaction.
# We can no longer trust the database state is clean.
invalidate_already_loaded_fixtures
# We also don't know for sure the connection wasn't
# mutated in dangerous ways.
connection.disconnect!
end
connection.pool.lock_thread = false
end end
@fixture_connections.clear @fixture_connection_pools.clear
teardown_shared_connection_pool teardown_shared_connection_pool
end end
def invalidate_already_loaded_fixtures
@@already_loaded_fixtures.clear
end
# Shares the writing connection pool with connections on # Shares the writing connection pool with connections on
# other handlers. # other handlers.
# #

@ -37,7 +37,7 @@ def test_no_automatic_reconnection_after_timeout
assert_not_predicate @connection, :active? assert_not_predicate @connection, :active?
ensure ensure
# Repair all fixture connections so other tests won't break. # Repair all fixture connections so other tests won't break.
@fixture_connections.each_key(&:verify!) @fixture_connection_pools.each { |p| p.connection.verify! }
end end
def test_successful_reconnection_after_timeout_with_manual_reconnect def test_successful_reconnection_after_timeout_with_manual_reconnect

@ -145,7 +145,7 @@ def test_reconnection_after_actual_disconnection_with_verify
assert_predicate @connection, :active? assert_predicate @connection, :active?
ensure ensure
# Repair all fixture connections so other tests won't break. # Repair all fixture connections so other tests won't break.
@fixture_connections.each_key(&:verify!) @fixture_connection_pools.each { |p| p.connection.verify! }
end end
def test_set_session_variable_true def test_set_session_variable_true

@ -6,8 +6,6 @@
module ActiveRecord module ActiveRecord
module ConnectionAdapters module ConnectionAdapters
class ConnectionHandlerTest < ActiveRecord::TestCase class ConnectionHandlerTest < ActiveRecord::TestCase
self.use_transactional_tests = false
fixtures :people fixtures :people
def setup def setup
@ -95,8 +93,6 @@ def test_fixtures_dont_raise_if_theres_no_writing_pool_config
connection_handler = ActiveRecord::Base.connection_handler connection_handler = ActiveRecord::Base.connection_handler
ActiveRecord::Base.connection_handler = ActiveRecord::ConnectionAdapters::ConnectionHandler.new ActiveRecord::Base.connection_handler = ActiveRecord::ConnectionAdapters::ConnectionHandler.new
setup_transactional_fixtures
assert_nothing_raised do assert_nothing_raised do
ActiveRecord::Base.connects_to(database: { reading: :arunit, writing: :arunit }) ActiveRecord::Base.connects_to(database: { reading: :arunit, writing: :arunit })
end end
@ -105,8 +101,6 @@ def test_fixtures_dont_raise_if_theres_no_writing_pool_config
ro_conn = ActiveRecord::Base.connection_handler.retrieve_connection("ActiveRecord::Base", role: :reading) ro_conn = ActiveRecord::Base.connection_handler.retrieve_connection("ActiveRecord::Base", role: :reading)
assert_equal rw_conn, ro_conn assert_equal rw_conn, ro_conn
teardown_transactional_fixtures
ensure ensure
ActiveRecord::Base.connection_handler = connection_handler ActiveRecord::Base.connection_handler = connection_handler
end end

@ -845,6 +845,69 @@ def test_role_and_shard_is_returned
assert_equal :shard_one, pool.connection.shard assert_equal :shard_one, pool.connection.shard
end end
def test_pin_connection_always_returns_the_same_connection
assert_not_predicate @pool, :active_connection?
@pool.pin_connection!(true)
pinned_connection = @pool.checkout
assert_not_predicate @pool, :active_connection?
assert_same pinned_connection, @pool.connection
assert_predicate @pool, :active_connection?
assert_same pinned_connection, @pool.checkout
@pool.release_connection
assert_not_predicate @pool, :active_connection?
assert_same pinned_connection, @pool.checkout
end
def test_pin_connection_connected?
skip("Can't test with in-memory dbs") if in_memory_db?
assert_not_predicate @pool, :connected?
@pool.pin_connection!(true)
assert_predicate @pool, :connected?
pin_connection = @pool.checkout
@pool.disconnect
assert_not_predicate @pool, :connected?
assert_same pin_connection, @pool.checkout
assert_predicate @pool, :connected?
end
def test_pin_connection_synchronize_the_connection
assert_equal ActiveSupport::Concurrency::NullLock, @pool.connection.lock
@pool.pin_connection!(true)
assert_not_equal ActiveSupport::Concurrency::NullLock, @pool.connection.lock
@pool.unpin_connection!
assert_equal ActiveSupport::Concurrency::NullLock, @pool.connection.lock
@pool.pin_connection!(false)
assert_equal ActiveSupport::Concurrency::NullLock, @pool.connection.lock
end
def test_pin_connection_opens_a_transaction
assert_instance_of NullTransaction, @pool.connection.current_transaction
@pool.pin_connection!(true)
assert_instance_of RealTransaction, @pool.connection.current_transaction
@pool.unpin_connection!
assert_instance_of NullTransaction, @pool.connection.current_transaction
end
def test_unpin_connection_returns_whether_transaction_has_been_rolledback
@pool.pin_connection!(true)
assert_equal true, @pool.unpin_connection!
@pool.pin_connection!(true)
@pool.connection.commit_transaction
assert_equal false, @pool.unpin_connection!
@pool.pin_connection!(true)
@pool.connection.rollback_transaction
assert_equal false, @pool.unpin_connection!
end
private private
def with_single_connection_pool def with_single_connection_pool
config = @db_config.configuration_hash.merge(pool: 1) config = @db_config.configuration_hash.merge(pool: 1)
@ -871,8 +934,8 @@ def setup
end end
def test_lock_thread_allow_fiber_reentrency def test_lock_thread_allow_fiber_reentrency
@pool.lock_thread = true
connection = @pool.checkout connection = @pool.checkout
connection.lock_thread = ActiveSupport::IsolatedExecutionState.context
connection.transaction do connection.transaction do
enumerator = Enumerator.new do |yielder| enumerator = Enumerator.new do |yielder|
connection.transaction do connection.transaction do

@ -1050,12 +1050,16 @@ def rollback_transaction(*args); end
def connect!; end def connect!; end
end.new end.new
connection.pool = Class.new do pool = connection.pool = Class.new do
def lock_thread=(lock_thread); end attr_reader :connection
end.new def initialize(connection); @connection = connection; end
def release_connection; end
def pin_connection!(_); end
def unpin_connection!; @connection.rollback_transaction; true; end
end.new(connection)
assert_called_with(connection, :begin_transaction, [], joinable: false, _lazy: false) do assert_called_with(pool, :pin_connection!, [true]) do
fire_connection_notification(connection) fire_connection_notification(connection.pool)
end end
end end
@ -1069,14 +1073,19 @@ def begin_transaction(*args); end
def rollback_transaction(*args) def rollback_transaction(*args)
@rollback_transaction_called = true @rollback_transaction_called = true
end end
def lock_thread=(lock_thread); end
def connect!; end def connect!; end
end.new end.new
connection.pool = Class.new do connection.pool = Class.new do
def lock_thread=(lock_thread); end attr_reader :connection
end.new def initialize(connection); @connection = connection; end
def release_connection; end
def pin_connection!(_); end
def unpin_connection!; @connection.rollback_transaction; true; end
end.new(connection)
fire_connection_notification(connection) fire_connection_notification(connection.pool)
teardown_fixtures teardown_fixtures
assert(connection.rollback_transaction_called, "Expected <mock connection>#rollback_transaction to be called but was not") assert(connection.rollback_transaction_called, "Expected <mock connection>#rollback_transaction to be called but was not")
@ -1093,17 +1102,21 @@ def connect!; end
end.new end.new
connection.pool = Class.new do connection.pool = Class.new do
def lock_thread=(lock_thread); end attr_reader :connection
end.new def initialize(connection); @connection = connection; end
def release_connection; end
def pin_connection!(_); end
def unpin_connection!; @connection.rollback_transaction; true; end
end.new(connection)
assert_called_with(connection, :begin_transaction, [], joinable: false, _lazy: false) do assert_called_with(connection.pool, :pin_connection!, [true]) do
fire_connection_notification(connection, shard: :shard_two) fire_connection_notification(connection.pool, shard: :shard_two)
end end
end end
private private
def fire_connection_notification(connection, shard: ActiveRecord::Base.default_shard) def fire_connection_notification(pool, shard: ActiveRecord::Base.default_shard)
assert_called_with(ActiveRecord::Base.connection_handler, :retrieve_connection, ["book"], returns: connection, shard: shard) do assert_called_with(ActiveRecord::Base.connection_handler, :retrieve_connection_pool, ["book"], returns: pool, shard: shard) do
message_bus = ActiveSupport::Notifications.instrumenter message_bus = ActiveSupport::Notifications.instrumenter
payload = { payload = {
connection_name: "book", connection_name: "book",
@ -1152,7 +1165,7 @@ def test_this_should_run_cleanly
class FixturesBrokenRollbackTest < ActiveRecord::TestCase class FixturesBrokenRollbackTest < ActiveRecord::TestCase
def blank_setup def blank_setup
@fixture_connections = [ActiveRecord::Base.connection] @fixture_connection_pools = [ActiveRecord::Base.connection_pool]
end end
alias_method :ar_setup_fixtures, :setup_fixtures alias_method :ar_setup_fixtures, :setup_fixtures
alias_method :setup_fixtures, :blank_setup alias_method :setup_fixtures, :blank_setup

@ -3,8 +3,9 @@
require "cases/helper" require "cases/helper"
class TestAdapterWithInvalidConnection < ActiveRecord::TestCase class TestAdapterWithInvalidConnection < ActiveRecord::TestCase
if current_adapter?(:Mysql2Adapter, :TrilogyAdapter) self.use_transactional_tests = false
if current_adapter?(:Mysql2Adapter, :TrilogyAdapter)
class Bird < ActiveRecord::Base class Bird < ActiveRecord::Base
end end

@ -634,22 +634,24 @@ def test_clear_query_cache_is_called_on_all_connections
end end
test "query cache is enabled in threads with shared connection" do test "query cache is enabled in threads with shared connection" do
ActiveRecord::Base.connection_pool.lock_thread = true ActiveRecord::Base.connection_pool.pin_connection!(ActiveSupport::IsolatedExecutionState.context)
assert_cache :off begin
ActiveRecord::Base.connection.enable_query_cache! assert_cache :off
assert_cache :clean ActiveRecord::Base.connection.enable_query_cache!
assert_cache :clean
thread_a = Thread.new do thread_a = Thread.new do
middleware { |env| middleware { |env|
assert_cache :clean assert_cache :clean
[200, {}, nil] [200, {}, nil]
}.call({}) }.call({})
end
thread_a.join
ensure
ActiveRecord::Base.connection_pool.unpin_connection!
end end
thread_a.join
ActiveRecord::Base.connection_pool.lock_thread = false
end end
private private

@ -56,6 +56,9 @@ def test_run_successfully
end end
end end
ActiveSupport::Notifications.unsubscribe(@connection_subscriber)
@connection_subscriber = nil
old_handler = ActiveRecord::Base.connection_handler old_handler = ActiveRecord::Base.connection_handler
ActiveRecord::Base.connection_handler = ActiveRecord::ConnectionAdapters::ConnectionHandler.new ActiveRecord::Base.connection_handler = ActiveRecord::ConnectionAdapters::ConnectionHandler.new
ActiveRecord::Base.establish_connection(:arunit) ActiveRecord::Base.establish_connection(:arunit)

@ -898,7 +898,7 @@ def test_crash
end end
def test_run_in_parallel_with_threads def test_run_in_parallel_with_threads
exercise_parallelization_regardless_of_machine_core_count(with: :threads) exercise_parallelization_regardless_of_machine_core_count(with: :threads, transactional_fixtures: false)
file_name = create_parallel_threads_test_file file_name = create_parallel_threads_test_file
@ -1390,7 +1390,7 @@ class ParallelTest < ActiveSupport::TestCase
RUBY RUBY
end end
def exercise_parallelization_regardless_of_machine_core_count(with:, threshold: 0) def exercise_parallelization_regardless_of_machine_core_count(with:, threshold: 0, transactional_fixtures: true)
file_content = ERB.new(<<-ERB, trim_mode: "-").result_with_hash(with: with.to_s) file_content = ERB.new(<<-ERB, trim_mode: "-").result_with_hash(with: with.to_s)
ENV["RAILS_ENV"] ||= "test" ENV["RAILS_ENV"] ||= "test"
require_relative "../config/environment" require_relative "../config/environment"
@ -1399,6 +1399,7 @@ def exercise_parallelization_regardless_of_machine_core_count(with:, threshold:
class ActiveSupport::TestCase class ActiveSupport::TestCase
# Run tests in parallel with specified workers # Run tests in parallel with specified workers
parallelize(workers: 2, with: :<%= with %>, threshold: #{threshold}) parallelize(workers: 2, with: :<%= with %>, threshold: #{threshold})
self.use_transactional_tests = #{transactional_fixtures}
# Setup all fixtures in test/fixtures/*.yml for all tests in alphabetical order. # Setup all fixtures in test/fixtures/*.yml for all tests in alphabetical order.
fixtures :all fixtures :all