Implement Active Job enqueue_after_transaction_commit

A fairly common mistake with Rails is to enqueue a job from inside a
transaction, with a record as argumemnt, which then lead to a RecordNotFound
error when picked up by the queue.

This is even one of the arguments advanced for job runners backed by the
database such as `solid_queue`, `delayed_job` or `good_job`.

But relying on this is undesirable in my opinion as it makes the Active Job
abstraction leaky, and if in the future you need to migrate to another backend
or even just move the queue to a separate database, you may experience a lot of
race conditions of the sort.

To resolve this problem globally, we can make Active Job optionally transaction
aware, and automatically defer job queueing to `after_commit`.

Co-Authored-By: Cristian Bica <cristian.bica@gmail.com>
This commit is contained in:
Jean Boussier 2024-03-27 12:38:49 +01:00
parent 0e0da316ca
commit e922c59207
24 changed files with 237 additions and 15 deletions

@ -55,10 +55,6 @@ Rails/IndexWith:
Style/AndOr:
Enabled: true
# Align `when` with `case`.
Layout/CaseIndentation:
Enabled: true
Layout/ClosingHeredocIndentation:
Enabled: true

@ -1,3 +1,30 @@
* Make Active Job transaction aware when used conjointly with Active Record.
A common mistake with Active Job is to enqueue jobs from inside a transaction,
causing them to potentially be picked and ran by another process, before the
transaction is committed, which result in various errors.
```ruby
Topic.transaction do
topic = Topic.create(...)
NewTopicNotificationJob.perform_later(topic)
end
```
Now Active Job will automatically defer the enqueuing to after the transaction is committed,
and drop the job if the transaction is rolled back.
Various queue implementations can chose to disable this behavior, and users can disable it,
or force it on a per job basis:
```ruby
class NewTopicNotificationJob < ApplicationJob
self.enqueue_after_transaction_commit = false # or `true`
end
```
*Jean Boussier*, *Cristian Bica*
* Do not trigger immediate loading of `ActiveJob::Base` when loading `ActiveJob::TestHelper`.
*Maxime Réty*

@ -39,6 +39,7 @@ module ActiveJob
autoload :Arguments
autoload :DeserializationError, "active_job/arguments"
autoload :SerializationError, "active_job/arguments"
autoload :EnqueueAfterTransactionCommit
eager_autoload do
autoload :Serializers

@ -0,0 +1,38 @@
# frozen_string_literal: true
module ActiveJob
module EnqueueAfterTransactionCommit # :nodoc:
extend ActiveSupport::Concern
included do
##
# :singleton-method:
#
# Defines if enqueueing this job from inside an Active Record transaction
# automatically defers the enqueue to after the transaction commit.
#
# It can be set on a per job basis:
# - `:always` forces the job to be deferred.
# - `:never` forces the job to be queueed immediately
# - `:default` let the queue adapter define the behavior (recommended).
class_attribute :enqueue_after_transaction_commit, instance_accessor: false, instance_predicate: false, default: :never
around_enqueue do |job, block|
after_transaction = case job.class.enqueue_after_transaction_commit
when :always
true
when :never
false
else # :default
queue_adapter.enqueue_after_transaction_commit?
end
if after_transaction
ActiveRecord.after_all_transactions_commit(&block)
else
block.call
end
end
end
end
end

@ -53,6 +53,15 @@ module ClassMethods
# Job#arguments or false if the enqueue did not succeed.
#
# After the attempted enqueue, the job will be yielded to an optional block.
#
# If Active Job is used conjointly with Active Record, and #perform_later is called
# inside an Active Record transaction, then the enqueue is implictly defered to after
# the transaction is committed, or droped if it's rolled back. This behavior can
# be changed on a per job basis:
#
# class NotificationJob < ApplicationJob
# self.enqueue_after_transaction_commit = false
# end
def perform_later(...)
job = job_or_instantiate(...)
enqueue_result = job.enqueue

@ -114,6 +114,7 @@ module ActiveJob
module QueueAdapters
extend ActiveSupport::Autoload
autoload :AbstractAdapter
autoload :AsyncAdapter
autoload :InlineAdapter
autoload :BackburnerAdapter

@ -0,0 +1,27 @@
# frozen_string_literal: true
module ActiveJob
module QueueAdapters
# = Active Job Abstract Adapter
#
# Active Job supports multiple job queue systems. ActiveJob::QueueAdapters::AbstractAdapter
# form the abstraction layer which makes this possible.
class AbstractAdapter
# Define whether enqueuing should implictly to after commit when called from
# inside a transaction. Most adapters should return true, but some adapters
# that use the same database as Active Record and are transaction aware can return
# false to continue enqueuing jobs are part of the transaction.
def enqueue_after_transaction_commit?
true
end
def enqueue(job)
raise NotImplementedError
end
def enqueue_at(job, timestamp)
raise NotImplementedError
end
end
end
end

@ -30,7 +30,7 @@ module QueueAdapters
# The adapter uses a {Concurrent Ruby}[https://github.com/ruby-concurrency/concurrent-ruby] thread pool to schedule and execute
# jobs. Since jobs share a single thread pool, long-running jobs will block
# short-lived jobs. Fine for dev/test; bad for production.
class AsyncAdapter
class AsyncAdapter < AbstractAdapter
# See {Concurrent::ThreadPoolExecutor}[https://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/ThreadPoolExecutor.html] for executor options.
def initialize(**executor_options)
@scheduler = Scheduler.new(**executor_options)

@ -14,7 +14,7 @@ module QueueAdapters
# To use Backburner set the queue_adapter config to +:backburner+.
#
# Rails.application.config.active_job.queue_adapter = :backburner
class BackburnerAdapter
class BackburnerAdapter < AbstractAdapter
def enqueue(job) # :nodoc:
response = Backburner::Worker.enqueue(JobWrapper, [job.serialize], queue: job.queue_name, pri: job.priority)
job.provider_job_id = response[:id] if response.is_a?(Hash)

@ -15,7 +15,15 @@ module QueueAdapters
# To use Delayed Job, set the queue_adapter config to +:delayed_job+.
#
# Rails.application.config.active_job.queue_adapter = :delayed_job
class DelayedJobAdapter
class DelayedJobAdapter < AbstractAdapter
def initialize(enqueue_after_transaction_commit: false)
@enqueue_after_transaction_commit = enqueue_after_transaction_commit
end
def enqueue_after_transaction_commit? # :nodoc:
@enqueue_after_transaction_commit
end
def enqueue(job) # :nodoc:
delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, priority: job.priority)
job.provider_job_id = delayed_job.id

@ -10,7 +10,11 @@ module QueueAdapters
# To use the Inline set the queue_adapter config to +:inline+.
#
# Rails.application.config.active_job.queue_adapter = :inline
class InlineAdapter
class InlineAdapter < AbstractAdapter
def enqueue_after_transaction_commit? # :nodoc:
false
end
def enqueue(job) # :nodoc:
Base.execute(job.serialize)
end

@ -18,7 +18,15 @@ module QueueAdapters
# To use queue_classic set the queue_adapter config to +:queue_classic+.
#
# Rails.application.config.active_job.queue_adapter = :queue_classic
class QueueClassicAdapter
class QueueClassicAdapter < AbstractAdapter
def initialize(enqueue_after_transaction_commit: false)
@enqueue_after_transaction_commit = enqueue_after_transaction_commit
end
def enqueue_after_transaction_commit? # :nodoc:
@enqueue_after_transaction_commit
end
def enqueue(job) # :nodoc:
qc_job = build_queue(job.queue_name).enqueue("#{JobWrapper.name}.perform", job.serialize)
job.provider_job_id = qc_job["id"] if qc_job.is_a?(Hash)

@ -27,7 +27,7 @@ module QueueAdapters
# To use Resque set the queue_adapter config to +:resque+.
#
# Rails.application.config.active_job.queue_adapter = :resque
class ResqueAdapter
class ResqueAdapter < AbstractAdapter
def enqueue(job) # :nodoc:
JobWrapper.instance_variable_set(:@queue, job.queue_name)
Resque.enqueue_to job.queue_name, JobWrapper, job.serialize

@ -17,7 +17,7 @@ module QueueAdapters
# To use Sidekiq set the queue_adapter config to +:sidekiq+.
#
# Rails.application.config.active_job.queue_adapter = :sidekiq
class SidekiqAdapter
class SidekiqAdapter < AbstractAdapter
def enqueue(job) # :nodoc:
job.provider_job_id = JobWrapper.set(
wrapped: job.class,

@ -17,7 +17,7 @@ module QueueAdapters
# To use Sneakers set the queue_adapter config to +:sneakers+.
#
# Rails.application.config.active_job.queue_adapter = :sneakers
class SneakersAdapter
class SneakersAdapter < AbstractAdapter
def initialize
@monitor = Monitor.new
end

@ -17,7 +17,7 @@ module QueueAdapters
# To use Sucker Punch set the queue_adapter config to +:sucker_punch+.
#
# Rails.application.config.active_job.queue_adapter = :sucker_punch
class SuckerPunchAdapter
class SuckerPunchAdapter < AbstractAdapter
def enqueue(job) # :nodoc:
if JobWrapper.respond_to?(:perform_async)
# sucker_punch 2.0 API

@ -11,10 +11,18 @@ module QueueAdapters
# To use the test adapter set +queue_adapter+ config to +:test+.
#
# Rails.application.config.active_job.queue_adapter = :test
class TestAdapter
attr_accessor(:perform_enqueued_jobs, :perform_enqueued_at_jobs, :filter, :reject, :queue, :at)
class TestAdapter < AbstractAdapter
attr_accessor(:perform_enqueued_jobs, :perform_enqueued_at_jobs, :filter, :reject, :queue, :at, :enqueue_after_transaction_commit)
attr_writer(:enqueued_jobs, :performed_jobs)
def initialize(enqueue_after_transaction_commit: true)
@enqueue_after_transaction_commit = enqueue_after_transaction_commit
end
def enqueue_after_transaction_commit? # :nodoc:
@enqueue_after_transaction_commit
end
# Provides a store of all the enqueued jobs with the TestAdapter so you can check them.
def enqueued_jobs
@enqueued_jobs ||= []

@ -25,6 +25,20 @@ class Railtie < Rails::Railtie # :nodoc:
end
end
initializer "active_job.enqueue_after_transaction_commit" do |app|
if config.active_job.key?(:enqueue_after_transaction_commit)
enqueue_after_transaction_commit = config.active_job.delete(:enqueue_after_transaction_commit)
ActiveSupport.on_load(:active_record) do
ActiveSupport.on_load(:active_job) do
include EnqueueAfterTransactionCommit
ActiveJob::Base.enqueue_after_transaction_commit = enqueue_after_transaction_commit
end
end
end
end
initializer "active_job.set_configs" do |app|
options = app.config.active_job
options.queue_adapter ||= :async

@ -1,6 +1,9 @@
# frozen_string_literal: true
require "helper"
require "active_job/queue_adapters/delayed_job_adapter"
require "jobs/disable_log_job"
require "jobs/hello_job"
class DelayedJobAdapterTest < ActiveSupport::TestCase
test "does not log arguments when log_arguments is set to false on a job" do

@ -5,11 +5,13 @@
module ActiveJob
module QueueAdapters
class StubOneAdapter
def enqueue_after_transaction_commit?; false; end
def enqueue(*); end
def enqueue_at(*); end
end
class StubTwoAdapter
def enqueue_after_transaction_commit?; false; end
def enqueue(*); end
def enqueue_at(*); end
end
@ -59,6 +61,7 @@ class QueueAdapterTest < ActiveJob::TestCase
module StubThreeAdapter
class << self
def enqueue_after_transaction_commit?; false; end
def enqueue(*); end
def enqueue_at(*); end
end
@ -71,6 +74,7 @@ def enqueue_at(*); end
end
class StubFourAdapter
def enqueue_after_transaction_commit?; false; end
def enqueue(*); end
def enqueue_at(*); end
def queue_adapter_name

@ -7,6 +7,10 @@ class << self
end
self.should_raise_sequence = []
def enqueue_after_transaction_commit?
false
end
def enqueue(*)
raise ActiveJob::EnqueueError, "There was an error enqueuing the job" if should_raise?
end

@ -60,6 +60,7 @@ Below are the default values associated with each target version. In cases of co
#### Default Values for Target Version 7.2
- [`config.active_job.enqueue_after_transaction_commit`](#config-active-job-enqueue-after-transaction-commit): `:default`
- [`config.active_record.automatically_invert_plural_associations`](#config-active-record-automatically-invert-plural-associations): `true`
- [`config.active_record.validate_migration_timestamps`](#config-active-record-validate-migration-timestamps): `true`
- [`config.active_storage.web_image_content_types`](#config-active-storage-web-image-content-types): `%w[image/png image/jpeg image/gif image/webp]`
@ -2762,6 +2763,48 @@ class EncoderJob < ActiveJob::Base
end
```
#### `config.active_job.enqueue_after_transaction_commit`
Controls whether Active Job's `#perform_later` and similar methods automatically defer
the job queuing to after the current Active Record transaction is committed.
It can be set to `:never` to never defer the enqueue, to `:always` always defer
the enqueue, or to `:default` to let the queue adapter define if it should be defered
or not. Active Job backends that use the same database than Active Record as a queue,
should generally prevent the deferring, and others should allow it.
Example:
```ruby
Topic.transaction do
topic = Topic.create(title: "New Topic")
NewTopicNotificationJob.perform_later(topic)
end
```
In this example, if the configuration is set to `:never`, the job will
be enqueued immediately, even thought the `Topic` hasn't been committed yet.
Because of this, if the job is picked up almost emmediately, or if the
transaction doesn't succeed for some reason, the job will fail to find this
topic in the database.
If it's set to `:always`, the job will be actually enqueued after the
transaction has been committed. If the transaction is rolled back, the job
won't be enqueued at all.
This configuration can additionally be set on a per job class basis:
```ruby
class SomeJob < ApplicationJob
self.enqueue_after_transaction_commit = :never
end
```
| Starting with version | The default value is |
| --------------------- | -------------------- |
| (original) | `:never` |
| 7.2 | `:default` |
#### `config.active_job.logger`
Accepts a logger conforming to the interface of Log4r or the default Ruby Logger class, which is then used to log information from Active Job. You can retrieve this logger by calling `logger` on either an Active Job class or an Active Job instance. Set to `nil` to disable logging.

@ -322,6 +322,10 @@ def load_defaults(target_version)
when "7.2"
load_defaults "7.1"
if respond_to?(:active_job)
active_job.enqueue_after_transaction_commit = :default
end
if respond_to?(:active_storage)
active_storage.web_image_content_types = %w( image/png image/jpeg image/gif image/webp )
end

@ -9,6 +9,29 @@
# Read the Guide for Upgrading Ruby on Rails for more info on each option.
# https://guides.rubyonrails.org/upgrading_ruby_on_rails.html
###
# Controls whether Active Job's `#perform_later` and similar methods automatically defer
# the job queuing to after the current Active Record transaction is committed.
#
# Example:
# Topic.transaction do
# topic = Topic.create(...)
# NewTopicNotificationJob.perform_later(topic)
# end
#
# In this example, if `enqueue_after_transaction_commit` is `false` the job will
# be enqueued immediately, even thought the `Topic` hasn't been committed yet.
# Because of this, if the job is picked up almost emmediately, it will fail to
# find this topic in the databse.
# With `enqueue_after_transaction_commit = true`, the job will be actually enqueued
# after the transaction has been committed.
#
# Note: Active Job backends can disable this feature. This is generally used by
# backends that use the same database than Active Record as a queue, hence they
# don't need this feature.
#++
# Rails.application.config.active_job.enqueue_after_transaction_commit = :default
###
# Adds image/webp to the list of content types Active Storage considers as an image
# Prevents automatic conversion to a fallback PNG, and assumes clients support WebP, as they support gif, jpeg, and png.