Allow to register transaction callbacks outside of a record
Ref: https://github.com/rails/rails/pull/26103 Ref: https://github.com/rails/rails/pull/51426 A fairly common mistake with Rails is to enqueue a job from inside a transaction, and a record as argumemnt, which then lead to a RecordNotFound error when picked up by the queue. This is even one of the arguments advanced for job runners backed by the database such as `solid_queue`, `delayed_job` or `good_job`. But relying on this is undesirable iin my opinion as it makes the Active Job abstraction leaky, and if in the future you need to migrate to another backend or even just move the queue to a separate database, you may experience a lot of race conditions of the sort. But more generally, being able to defer work to after the current transaction has been a missing feature of Active Record. Right now the only way to do it is from a model callback, and this forces moving things in Active Record models that sometimes are better done elsewhere. Even as a self-proclaimed "service object skeptic", I often wanted this capability over the last decade, and I'm sure it got asked or desired by many more people. Also there's some 3rd party gems adding this capability using monkey patches. It's not a reason to upstream the capability, but it's a proof that there is demand for it. Implementation wise, this proof of concept shows that it's not really hard to implement, even with nested multi-db transactions support. Co-Authored-By: Cristian Bica <cristian.bica@gmail.com>
This commit is contained in:
parent
eac95d531d
commit
c2df237414
@ -1,3 +1,51 @@
|
||||
* `ActiveRecord::Base.transaction` now yields an `ActiveRecord::Transation` object.
|
||||
|
||||
This allows to register callbacks on it.
|
||||
|
||||
```ruby
|
||||
Article.transaction do |transaction|
|
||||
article.update(published: true)
|
||||
transaction.after_commit do
|
||||
PublishNotificationMailer.with(article: article).deliver_later
|
||||
end
|
||||
end
|
||||
```
|
||||
|
||||
*Jean Boussier*
|
||||
|
||||
* Add `ActiveRecord::Base.current_transaction`.
|
||||
|
||||
Returns the current transaction, to allow registering callbacks on it.
|
||||
|
||||
```ruby
|
||||
Article.current_transaction.after_commit do
|
||||
PublishNotificationMailer.with(article: article).deliver_later
|
||||
end
|
||||
```
|
||||
|
||||
*Jean Boussier*
|
||||
|
||||
* Add `ActiveRecord.after_all_transactions_commit` callback.
|
||||
|
||||
Useful for code that may run either inside or outside a transaction and need
|
||||
to perform works after the state changes have been properly peristed.
|
||||
|
||||
```ruby
|
||||
def publish_article(article)
|
||||
article.update(published: true)
|
||||
ActiveRecord.after_all_transactions_commit do
|
||||
PublishNotificationMailer.with(article: article).deliver_later
|
||||
end
|
||||
end
|
||||
```
|
||||
|
||||
In the above example, the block is either executed immediately if called outside
|
||||
of a transaction, or called after the open transaction is committed.
|
||||
|
||||
If the transaction is rolled back, the block isn't called.
|
||||
|
||||
*Jean Boussier*
|
||||
|
||||
* Add the ability to ignore counter cache columns until they are backfilled
|
||||
|
||||
Starting to use counter caches on existing large tables can be troublesome, because the column
|
||||
|
@ -86,6 +86,7 @@ module ActiveRecord
|
||||
autoload :Timestamp
|
||||
autoload :TokenFor
|
||||
autoload :TouchLater
|
||||
autoload :Transaction
|
||||
autoload :Transactions
|
||||
autoload :Translation
|
||||
autoload :Validations
|
||||
@ -540,6 +541,51 @@ def self.eager_load!
|
||||
def self.disconnect_all!
|
||||
ConnectionAdapters::PoolConfig.disconnect_all!
|
||||
end
|
||||
|
||||
# Registers a block to be called after all the current transactions have been
|
||||
# committed.
|
||||
#
|
||||
# If there is no currently open transaction, the block is called immediately.
|
||||
#
|
||||
# If there are multiple nested transactions, the block is called after the outermost one
|
||||
# has been committed,
|
||||
#
|
||||
# If any of the currently open transactions is rolled back, the block is never called.
|
||||
#
|
||||
# If multiple transactions are open across multiple databases, the block will be invoked
|
||||
# if and once all of them have been committed. But note that nesting transactions across
|
||||
# two distinct databases is a sharding anti-pattern that comes with a world of hurts.
|
||||
def self.after_all_transactions_commit(&block)
|
||||
open_transactions = all_open_transactions
|
||||
|
||||
if open_transactions.empty?
|
||||
yield
|
||||
elsif open_transactions.size == 1
|
||||
open_transactions.first.after_commit(&block)
|
||||
else
|
||||
count = open_transactions.size
|
||||
callback = -> do
|
||||
count -= 1
|
||||
block.call if count.zero?
|
||||
end
|
||||
open_transactions.each do |t|
|
||||
t.after_commit(&callback)
|
||||
end
|
||||
open_transactions = nil # rubocop:disable Lint/UselessAssignment avoid holding it in the closure
|
||||
end
|
||||
end
|
||||
|
||||
def self.all_open_transactions # :nodoc:
|
||||
open_transactions = []
|
||||
Base.connection_handler.each_connection_pool do |pool|
|
||||
if active_connection = pool.active_connection
|
||||
if active_connection.current_transaction.open? && active_connection.current_transaction.joinable?
|
||||
open_transactions << active_connection.current_transaction
|
||||
end
|
||||
end
|
||||
end
|
||||
open_transactions
|
||||
end
|
||||
end
|
||||
|
||||
ActiveSupport.on_load(:active_record) do
|
||||
|
@ -235,6 +235,17 @@ def truncate_tables(*table_names) # :nodoc:
|
||||
# Runs the given block in a database transaction, and returns the result
|
||||
# of the block.
|
||||
#
|
||||
# == Transaction callbacks
|
||||
#
|
||||
# #transaction yields an ActiveRecord::Transaction object on which it is
|
||||
# possible to register callback:
|
||||
#
|
||||
# ActiveRecord::Base.transaction do |transaction|
|
||||
# transaction.before_commit { puts "before commit!" }
|
||||
# transaction.after_commit { puts "after commit!" }
|
||||
# transaction.after_rollback { puts "after rollback!" }
|
||||
# end
|
||||
#
|
||||
# == Nested transactions support
|
||||
#
|
||||
# #transaction calls can be nested. By default, this makes all database
|
||||
@ -345,7 +356,7 @@ def transaction(requires_new: nil, isolation: nil, joinable: true, &block)
|
||||
if isolation
|
||||
raise ActiveRecord::TransactionIsolationError, "cannot set isolation when joining a transaction"
|
||||
end
|
||||
yield
|
||||
yield current_transaction
|
||||
else
|
||||
transaction_manager.within_new_transaction(isolation: isolation, joinable: joinable, &block)
|
||||
end
|
||||
|
@ -116,15 +116,19 @@ def dirty!; end
|
||||
def invalidated?; false; end
|
||||
def invalidate!; end
|
||||
def materialized?; false; end
|
||||
def before_commit; yield; end
|
||||
def after_commit; yield; end
|
||||
def after_rollback; end # noop
|
||||
end
|
||||
|
||||
class Transaction # :nodoc:
|
||||
class Transaction < ActiveRecord::Transaction # :nodoc:
|
||||
attr_reader :connection, :state, :savepoint_name, :isolation_level
|
||||
attr_accessor :written
|
||||
|
||||
delegate :invalidate!, :invalidated?, to: :@state
|
||||
|
||||
def initialize(connection, isolation: nil, joinable: true, run_commit_callbacks: false)
|
||||
super()
|
||||
@connection = connection
|
||||
@state = TransactionState.new
|
||||
@records = nil
|
||||
@ -191,60 +195,76 @@ def restore!
|
||||
end
|
||||
|
||||
def rollback_records
|
||||
return unless records
|
||||
if records
|
||||
begin
|
||||
ite = unique_records
|
||||
|
||||
ite = unique_records
|
||||
instances_to_run_callbacks_on = prepare_instances_to_run_callbacks_on(ite)
|
||||
|
||||
instances_to_run_callbacks_on = prepare_instances_to_run_callbacks_on(ite)
|
||||
|
||||
run_action_on_records(ite, instances_to_run_callbacks_on) do |record, should_run_callbacks|
|
||||
record.rolledback!(force_restore_state: full_rollback?, should_run_callbacks: should_run_callbacks)
|
||||
end
|
||||
ensure
|
||||
ite&.each do |i|
|
||||
i.rolledback!(force_restore_state: full_rollback?, should_run_callbacks: false)
|
||||
run_action_on_records(ite, instances_to_run_callbacks_on) do |record, should_run_callbacks|
|
||||
record.rolledback!(force_restore_state: full_rollback?, should_run_callbacks: should_run_callbacks)
|
||||
end
|
||||
ensure
|
||||
ite&.each do |i|
|
||||
i.rolledback!(force_restore_state: full_rollback?, should_run_callbacks: false)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@callbacks&.each(&:after_rollback)
|
||||
end
|
||||
|
||||
def before_commit_records
|
||||
return unless records
|
||||
|
||||
if @run_commit_callbacks
|
||||
if ActiveRecord.before_committed_on_all_records
|
||||
ite = unique_records
|
||||
if records
|
||||
if ActiveRecord.before_committed_on_all_records
|
||||
ite = unique_records
|
||||
|
||||
instances_to_run_callbacks_on = records.each_with_object({}) do |record, candidates|
|
||||
candidates[record] = record
|
||||
end
|
||||
instances_to_run_callbacks_on = records.each_with_object({}) do |record, candidates|
|
||||
candidates[record] = record
|
||||
end
|
||||
|
||||
run_action_on_records(ite, instances_to_run_callbacks_on) do |record, should_run_callbacks|
|
||||
record.before_committed! if should_run_callbacks
|
||||
run_action_on_records(ite, instances_to_run_callbacks_on) do |record, should_run_callbacks|
|
||||
record.before_committed! if should_run_callbacks
|
||||
end
|
||||
else
|
||||
records.uniq.each(&:before_committed!)
|
||||
end
|
||||
else
|
||||
records.uniq.each(&:before_committed!)
|
||||
end
|
||||
|
||||
@callbacks&.each(&:before_commit)
|
||||
end
|
||||
# Note: When @run_commit_callbacks is false #commit_records takes care of appending
|
||||
# remaining callbacks to the parent transaction
|
||||
end
|
||||
|
||||
def commit_records
|
||||
return unless records
|
||||
if records
|
||||
begin
|
||||
ite = unique_records
|
||||
|
||||
ite = unique_records
|
||||
if @run_commit_callbacks
|
||||
instances_to_run_callbacks_on = prepare_instances_to_run_callbacks_on(ite)
|
||||
|
||||
if @run_commit_callbacks
|
||||
instances_to_run_callbacks_on = prepare_instances_to_run_callbacks_on(ite)
|
||||
|
||||
run_action_on_records(ite, instances_to_run_callbacks_on) do |record, should_run_callbacks|
|
||||
record.committed!(should_run_callbacks: should_run_callbacks)
|
||||
end
|
||||
else
|
||||
while record = ite.shift
|
||||
# if not running callbacks, only adds the record to the parent transaction
|
||||
connection.add_transaction_record(record)
|
||||
run_action_on_records(ite, instances_to_run_callbacks_on) do |record, should_run_callbacks|
|
||||
record.committed!(should_run_callbacks: should_run_callbacks)
|
||||
end
|
||||
else
|
||||
while record = ite.shift
|
||||
# if not running callbacks, only adds the record to the parent transaction
|
||||
connection.add_transaction_record(record)
|
||||
end
|
||||
end
|
||||
ensure
|
||||
ite&.each { |i| i.committed!(should_run_callbacks: false) }
|
||||
end
|
||||
end
|
||||
ensure
|
||||
ite&.each { |i| i.committed!(should_run_callbacks: false) }
|
||||
|
||||
if @run_commit_callbacks
|
||||
@callbacks&.each(&:after_commit)
|
||||
elsif @callbacks
|
||||
connection.current_transaction.append_callbacks(@callbacks)
|
||||
end
|
||||
end
|
||||
|
||||
def full_rollback?; true; end
|
||||
@ -533,7 +553,7 @@ def within_new_transaction(isolation: nil, joinable: true)
|
||||
@connection.lock.synchronize do
|
||||
transaction = begin_transaction(isolation: isolation, joinable: joinable)
|
||||
begin
|
||||
yield
|
||||
yield transaction
|
||||
rescue Exception => error
|
||||
rollback_transaction
|
||||
after_failure_actions(transaction, error)
|
||||
@ -573,7 +593,7 @@ def current_transaction
|
||||
end
|
||||
|
||||
private
|
||||
NULL_TRANSACTION = NullTransaction.new
|
||||
NULL_TRANSACTION = NullTransaction.new.freeze
|
||||
|
||||
# Deallocate invalidated prepared statements outside of the transaction
|
||||
def after_failure_actions(transaction, error)
|
||||
|
68
activerecord/lib/active_record/transaction.rb
Normal file
68
activerecord/lib/active_record/transaction.rb
Normal file
@ -0,0 +1,68 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
module ActiveRecord
|
||||
class Transaction
|
||||
class Callback # :nodoc:
|
||||
def initialize(event, callback)
|
||||
@event = event
|
||||
@callback = callback
|
||||
end
|
||||
|
||||
def before_commit
|
||||
@callback.call if @event == :before_commit
|
||||
end
|
||||
|
||||
def after_commit
|
||||
@callback.call if @event == :after_commit
|
||||
end
|
||||
|
||||
def after_rollback
|
||||
@callback.call if @event == :after_rollback
|
||||
end
|
||||
end
|
||||
|
||||
def initialize # :nodoc:
|
||||
@callbacks = nil
|
||||
end
|
||||
|
||||
# Registers a block to be called before the current transaction is fully committed.
|
||||
#
|
||||
# If there is no currently open transactions, the block is called immediately.
|
||||
#
|
||||
# If the current transaction has a parent transaction, the callback is transfered to
|
||||
# the parent when the current transaction commits, or dropped when the current transaction
|
||||
# is rolled back. This operation is repeated until the outermost transaction is reached.
|
||||
def before_commit(&block)
|
||||
(@callbacks ||= []) << Callback.new(:before_commit, block)
|
||||
end
|
||||
|
||||
# Registers a block to be called after the current transaction is fully committed.
|
||||
#
|
||||
# If there is no currently open transactions, the block is called immediately.
|
||||
#
|
||||
# If the current transaction has a parent transaction, the callback is transfered to
|
||||
# the parent when the current transaction commits, or dropped when the current transaction
|
||||
# is rolled back. This operation is repeated until the outermost transaction is reached.
|
||||
def after_commit(&block)
|
||||
(@callbacks ||= []) << Callback.new(:after_commit, block)
|
||||
end
|
||||
|
||||
# Registers a block to be called after the current transaction is rolled back.
|
||||
#
|
||||
# If there is no currently open transactions, the block is never called.
|
||||
#
|
||||
# If the current transaction is successfully committed but has a parent
|
||||
# transaction, the callback is automatically added to the parent transaction.
|
||||
#
|
||||
# If the entire chain of nested transactions are all successfully committed,
|
||||
# the block is never called.
|
||||
def after_rollback(&block)
|
||||
(@callbacks ||= []) << Callback.new(:after_rollback, block)
|
||||
end
|
||||
|
||||
protected
|
||||
def append_callbacks(callbacks)
|
||||
(@callbacks ||= []).concat(callbacks)
|
||||
end
|
||||
end
|
||||
end
|
@ -224,6 +224,11 @@ def transaction(**options, &block)
|
||||
end
|
||||
end
|
||||
|
||||
# Returns the current transaction. See ActiveRecord::Transactions API docs.
|
||||
def current_transaction
|
||||
connection_pool.active_connection&.current_transaction || ConnectionAdapters::NULL_TRANSACTION
|
||||
end
|
||||
|
||||
def before_commit(*args, &block) # :nodoc:
|
||||
set_options_for_callbacks!(args)
|
||||
set_callback(:before_commit, :before, *args, &block)
|
||||
|
@ -19,6 +19,182 @@ def setup
|
||||
@first, @second = Topic.find(1, 2).sort_by(&:id)
|
||||
end
|
||||
|
||||
def test_after_all_transactions_committ
|
||||
called = 0
|
||||
ActiveRecord.after_all_transactions_commit { called += 1 }
|
||||
assert_equal 1, called
|
||||
|
||||
ActiveRecord.after_all_transactions_commit { called += 1 }
|
||||
assert_equal 2, called
|
||||
|
||||
called = 0
|
||||
Topic.transaction do
|
||||
ActiveRecord.after_all_transactions_commit { called += 1 }
|
||||
assert_equal 0, called
|
||||
end
|
||||
assert_equal 1, called
|
||||
|
||||
called = 0
|
||||
Topic.transaction do
|
||||
Topic.transaction(requires_new: true) do
|
||||
ActiveRecord.after_all_transactions_commit { called += 1 }
|
||||
assert_equal 0, called
|
||||
end
|
||||
assert_equal 0, called
|
||||
end
|
||||
assert_equal 1, called
|
||||
|
||||
called = 0
|
||||
Topic.transaction do
|
||||
ActiveRecord.after_all_transactions_commit { called += 1 }
|
||||
assert_equal 0, called
|
||||
raise ActiveRecord::Rollback
|
||||
end
|
||||
assert_equal 0, called
|
||||
end
|
||||
|
||||
def test_after_current_transaction_commit_multidb_nested_transactions
|
||||
called = 0
|
||||
ARUnit2Model.transaction do
|
||||
Topic.transaction do
|
||||
ActiveRecord.after_all_transactions_commit { called += 1 }
|
||||
assert_equal 0, called
|
||||
end
|
||||
assert_equal 0, called
|
||||
end
|
||||
assert_equal 1, called
|
||||
end
|
||||
|
||||
def test_transaction_after_commit_callback
|
||||
called = 0
|
||||
Topic.current_transaction.after_commit { called += 1 }
|
||||
assert_equal 1, called
|
||||
|
||||
Topic.current_transaction.after_commit { called += 1 }
|
||||
assert_equal 2, called
|
||||
|
||||
called = 0
|
||||
Topic.transaction do
|
||||
Topic.current_transaction.after_commit { called += 1 }
|
||||
assert_equal 0, called
|
||||
end
|
||||
assert_equal 1, called
|
||||
|
||||
called = 0
|
||||
Topic.transaction do
|
||||
Topic.transaction(requires_new: true) do
|
||||
Topic.current_transaction.after_commit { called += 1 }
|
||||
assert_equal 0, called
|
||||
end
|
||||
assert_equal 0, called
|
||||
end
|
||||
assert_equal 1, called
|
||||
|
||||
called = 0
|
||||
Topic.transaction do
|
||||
Topic.current_transaction.after_commit { called += 1 }
|
||||
assert_equal 0, called
|
||||
raise ActiveRecord::Rollback
|
||||
end
|
||||
assert_equal 0, called
|
||||
|
||||
called = 0
|
||||
ARUnit2Model.transaction do
|
||||
Topic.transaction do
|
||||
Topic.current_transaction.after_commit { called += 1 }
|
||||
assert_equal 0, called
|
||||
end
|
||||
assert_equal 1, called
|
||||
end
|
||||
assert_equal 1, called
|
||||
end
|
||||
|
||||
def test_transaction_before_commit_callback
|
||||
called = 0
|
||||
Topic.current_transaction.before_commit { called += 1 }
|
||||
assert_equal 1, called
|
||||
|
||||
Topic.current_transaction.before_commit { called += 1 }
|
||||
assert_equal 2, called
|
||||
|
||||
called = 0
|
||||
Topic.transaction do
|
||||
Topic.current_transaction.before_commit { called += 1 }
|
||||
assert_equal 0, called
|
||||
end
|
||||
assert_equal 1, called
|
||||
|
||||
called = 0
|
||||
Topic.transaction do
|
||||
Topic.transaction(requires_new: true) do
|
||||
Topic.current_transaction.before_commit { called += 1 }
|
||||
assert_equal 0, called
|
||||
end
|
||||
assert_equal 0, called
|
||||
end
|
||||
assert_equal 1, called
|
||||
|
||||
called = 0
|
||||
Topic.transaction do
|
||||
Topic.current_transaction.before_commit { called += 1 }
|
||||
assert_equal 0, called
|
||||
raise ActiveRecord::Rollback
|
||||
end
|
||||
assert_equal 0, called
|
||||
|
||||
called = 0
|
||||
ARUnit2Model.transaction do
|
||||
Topic.transaction do
|
||||
Topic.current_transaction.before_commit { called += 1 }
|
||||
assert_equal 0, called
|
||||
end
|
||||
assert_equal 1, called
|
||||
end
|
||||
assert_equal 1, called
|
||||
end
|
||||
|
||||
def test_transaction_after_rollback_callback
|
||||
called = 0
|
||||
Topic.current_transaction.after_rollback { called += 1 }
|
||||
assert_equal 0, called
|
||||
|
||||
called = 0
|
||||
Topic.transaction do
|
||||
Topic.current_transaction.after_rollback { called += 1 }
|
||||
assert_equal 0, called
|
||||
end
|
||||
assert_equal 0, called
|
||||
|
||||
called = 0
|
||||
Topic.transaction do
|
||||
Topic.current_transaction.after_rollback { called += 1 }
|
||||
assert_equal 0, called
|
||||
raise ActiveRecord::Rollback
|
||||
end
|
||||
assert_equal 1, called
|
||||
|
||||
called = 0
|
||||
Topic.transaction do
|
||||
Topic.current_transaction.after_rollback { called += 1 }
|
||||
Topic.transaction(requires_new: true) do
|
||||
raise ActiveRecord::Rollback
|
||||
end
|
||||
end
|
||||
assert_equal 0, called
|
||||
|
||||
called = 0
|
||||
Topic.transaction do
|
||||
assert_nothing_raised do
|
||||
Topic.transaction(requires_new: true) do
|
||||
Topic.current_transaction.after_rollback { called += 1 }
|
||||
raise ActiveRecord::Rollback
|
||||
end
|
||||
end
|
||||
assert_equal 1, called
|
||||
end
|
||||
assert_equal 1, called
|
||||
end
|
||||
|
||||
def test_rollback_dirty_changes
|
||||
topic = topics(:fifth)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user