Wrap evaluation of db/seeds.rb with the executor
Before #34953, when using the `:async` Active Job queue adapter, jobs enqueued in `db/seeds.rb`, such as Active Storage analysis jobs, would cause a hang (see #34939). Therefore, #34953 changed all jobs enqueued in `db/seeds.rb` to use the `:inline` queue adapter instead. (This behavior was later limited to only take effect when the `:async` adapter was configured, see #35905.) However, inline jobs in `db/seeds.rb` cleared `CurrentAttributes` values (see #37526). Therefore, #37568 changed the `:inline` adapter to wrap each job in its own thread, for isolation. However, wrapping a job in its own thread affects which database connection it uses. Thus inline jobs can no longer execute within the calling thread's database transaction, including seeing any uncommitted changes. Additionally, if the calling thread is not wrapped with the executor, the inline job thread (which is wrapped with the executor) can deadlock on the load interlock. And when testing (with `connection_pool.lock_thread = true`), the inline job thread can deadlock on one of the locks added by #28083. Therefore, this commit reverts the solutions of #34953 and #37568, and instead wraps evaluation of `db/seeds.rb` with the executor. This eliminates the original hang from #34939, which was also due to running multiple threads and not wrapping all of them with the executor. And, because nested calls to `executor.wrap` are ignored, any inline jobs in `db/seeds.rb` will not clear `CurrentAttributes` values. Alternative fix for #34939. Reverts #34953. Reverts #35905. Partially reverts #35896. Alternative fix for #37526. Reverts #37568. Fixes #40552.
This commit is contained in:
parent
8435250167
commit
648da12519
@ -12,7 +12,7 @@ module QueueAdapters
|
||||
# Rails.application.config.active_job.queue_adapter = :inline
|
||||
class InlineAdapter
|
||||
def enqueue(job) #:nodoc:
|
||||
Thread.new { Base.execute(job.serialize) }.join
|
||||
Base.execute(job.serialize)
|
||||
end
|
||||
|
||||
def enqueue_at(*) #:nodoc:
|
||||
|
@ -4,7 +4,6 @@
|
||||
require "jobs/logging_job"
|
||||
require "jobs/hello_job"
|
||||
require "jobs/provider_jid_job"
|
||||
require "jobs/thread_job"
|
||||
require "active_support/core_ext/numeric/time"
|
||||
|
||||
class QueuingTest < ActiveSupport::TestCase
|
||||
@ -146,21 +145,4 @@ class QueuingTest < ActiveSupport::TestCase
|
||||
assert job_executed "#{@id}.2"
|
||||
assert job_executed_at("#{@id}.2") < job_executed_at("#{@id}.1")
|
||||
end
|
||||
|
||||
test "inline jobs run on separate threads" do
|
||||
skip unless adapter_is?(:inline)
|
||||
|
||||
after_job_thread = Thread.new do
|
||||
ThreadJob.latch.wait
|
||||
assert_nil Thread.current[:job_ran]
|
||||
assert ThreadJob.thread[:job_ran]
|
||||
ThreadJob.test_latch.count_down
|
||||
end
|
||||
|
||||
ThreadJob.perform_later
|
||||
|
||||
after_job_thread.join
|
||||
|
||||
assert_nil Thread.current[:job_ran]
|
||||
end
|
||||
end
|
||||
|
@ -1,22 +0,0 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
class ThreadJob < ActiveJob::Base
|
||||
class << self
|
||||
attr_accessor :thread
|
||||
|
||||
def latch
|
||||
@latch ||= Concurrent::CountDownLatch.new
|
||||
end
|
||||
|
||||
def test_latch
|
||||
@test_latch ||= Concurrent::CountDownLatch.new
|
||||
end
|
||||
end
|
||||
|
||||
def perform
|
||||
Thread.current[:job_ran] = true
|
||||
self.class.thread = Thread.current
|
||||
self.class.latch.count_down
|
||||
self.class.test_latch.wait
|
||||
end
|
||||
end
|
@ -2,6 +2,7 @@
|
||||
|
||||
require "rails/railtie"
|
||||
require "rails/engine/railties"
|
||||
require "active_support/callbacks"
|
||||
require "active_support/core_ext/module/delegation"
|
||||
require "active_support/core_ext/object/try"
|
||||
require "pathname"
|
||||
@ -422,6 +423,9 @@ def find(path)
|
||||
end
|
||||
end
|
||||
|
||||
include ActiveSupport::Callbacks
|
||||
define_callbacks :load_seed
|
||||
|
||||
delegate :middleware, :root, :paths, to: :config
|
||||
delegate :engine_name, :isolated?, to: :class
|
||||
|
||||
@ -559,13 +563,7 @@ def config
|
||||
# Blog::Engine.load_seed
|
||||
def load_seed
|
||||
seed_file = paths["db/seeds.rb"].existent.first
|
||||
return unless seed_file
|
||||
|
||||
if config.try(:active_job)&.queue_adapter == :async
|
||||
with_inline_jobs { load(seed_file) }
|
||||
else
|
||||
load(seed_file)
|
||||
end
|
||||
run_callbacks(:load_seed) { load(seed_file) } if seed_file
|
||||
end
|
||||
|
||||
initializer :load_environment_config, before: :load_environment_hook, group: :all do
|
||||
@ -637,6 +635,12 @@ def load_seed
|
||||
end
|
||||
end
|
||||
|
||||
initializer :wrap_executor_around_load_seed do |app|
|
||||
self.class.set_callback(:load_seed, :around) do |engine, seeds_block|
|
||||
app.executor.wrap(&seeds_block)
|
||||
end
|
||||
end
|
||||
|
||||
initializer :engines_blank_point do
|
||||
# We need this initializer so all extra initializers added in engines are
|
||||
# consistently executed after all the initializers above across all engines.
|
||||
@ -678,18 +682,6 @@ def load_config_initializer(initializer) # :doc:
|
||||
end
|
||||
end
|
||||
|
||||
def with_inline_jobs
|
||||
queue_adapter = config.active_job.queue_adapter
|
||||
ActiveSupport.on_load(:active_job) do
|
||||
self.queue_adapter = :inline
|
||||
end
|
||||
yield
|
||||
ensure
|
||||
ActiveSupport.on_load(:active_job) do
|
||||
self.queue_adapter = queue_adapter
|
||||
end
|
||||
end
|
||||
|
||||
def has_migrations?
|
||||
paths["db/migrate"].existent.any?
|
||||
end
|
||||
|
@ -1,6 +1,7 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
require "rails/initializable"
|
||||
require "active_support/descendants_tracker"
|
||||
require "active_support/inflector"
|
||||
require "active_support/core_ext/module/introspection"
|
||||
require "active_support/core_ext/module/delegation"
|
||||
@ -135,6 +136,7 @@ module Rails
|
||||
class Railtie
|
||||
autoload :Configuration, "rails/railtie/configuration"
|
||||
|
||||
extend ActiveSupport::DescendantsTracker
|
||||
include Initializable
|
||||
|
||||
ABSTRACT_RAILTIES = %w(Rails::Railtie Rails::Engine Rails::Application)
|
||||
@ -144,13 +146,7 @@ class << self
|
||||
delegate :config, to: :instance
|
||||
|
||||
def subclasses
|
||||
@subclasses ||= []
|
||||
end
|
||||
|
||||
def inherited(base)
|
||||
unless base.abstract_railtie?
|
||||
subclasses << base
|
||||
end
|
||||
super.reject(&:abstract_railtie?)
|
||||
end
|
||||
|
||||
def rake_tasks(&blk)
|
||||
|
@ -879,29 +879,40 @@ def index
|
||||
assert Bukkits::Engine.config.bukkits_seeds_loaded
|
||||
end
|
||||
|
||||
test "jobs are ran inline while loading seeds with async adapter configured" do
|
||||
test "loading seed data is wrapped by the executor" do
|
||||
app_file "db/seeds.rb", <<-RUBY
|
||||
Rails.application.config.seed_queue_adapter = ActiveJob::Base.queue_adapter
|
||||
Rails.application.config.seeding_wrapped_by_executor = Rails.application.executor.active?
|
||||
RUBY
|
||||
|
||||
boot_rails
|
||||
Rails.application.load_seed
|
||||
|
||||
assert_instance_of ActiveJob::QueueAdapters::InlineAdapter, Rails.application.config.seed_queue_adapter
|
||||
assert_instance_of ActiveJob::QueueAdapters::AsyncAdapter, ActiveJob::Base.queue_adapter
|
||||
assert_predicate Rails.application.config, :seeding_wrapped_by_executor
|
||||
end
|
||||
|
||||
test "jobs are ran with original adapter while loading seeds with custom adapter configured" do
|
||||
test "inline jobs do not clear CurrentAttributes when loading seed data" do
|
||||
app_file "db/seeds.rb", <<-RUBY
|
||||
Rails.application.config.seed_queue_adapter = ActiveJob::Base.queue_adapter
|
||||
class SeedsAttributes < ActiveSupport::CurrentAttributes
|
||||
attribute :foo
|
||||
end
|
||||
|
||||
class SeedsJob < ActiveJob::Base
|
||||
self.queue_adapter = :inline
|
||||
def perform
|
||||
Rails.application.config.seeds_job_ran = true
|
||||
end
|
||||
end
|
||||
|
||||
SeedsAttributes.foo = 42
|
||||
SeedsJob.perform_later
|
||||
Rails.application.config.seeds_attributes_foo = SeedsAttributes.foo
|
||||
RUBY
|
||||
|
||||
boot_rails
|
||||
Rails.application.config.active_job.queue_adapter = :delayed_job
|
||||
Rails.application.load_seed
|
||||
|
||||
assert_instance_of ActiveJob::QueueAdapters::DelayedJobAdapter, Rails.application.config.seed_queue_adapter
|
||||
assert_instance_of ActiveJob::QueueAdapters::DelayedJobAdapter, ActiveJob::Base.queue_adapter
|
||||
assert Rails.application.config.seeds_job_ran
|
||||
assert_equal 42, Rails.application.config.seeds_attributes_foo
|
||||
end
|
||||
|
||||
test "seed data can be loaded when ActiveJob is not present" do
|
||||
|
Loading…
Reference in New Issue
Block a user