Allow Adapter#select_all to be performed asynchronously from a background thread pool

Sometimes a controller or a job has to perform multiple independent queries, e.g.:

```
def index
  @posts = Post.published
  @categories = Category.active
end
```

Since these two queries are totally independent, ideally you could
execute them in parallel, so that assuming that each take 50ms, the
total query time would be 50ms rather than 100ms.

A very naive way to do this is to simply call `Relation#to_a` in a
background thread, the problem is that most Rails applications, and
even Rails itself rely on thread local state (`PerThreadRegistry`,
`CurrentAttributes`, etc). So executing such a high level interface
from another thread is likely to lead to many context loss problems
or even thread safety issues.

What we can do instead, is to schedule a much lower level operation
(`Adapter#select_all`) in a thread pool, and return a future/promise.
This way we kepp most of the risky code on the main thread, but perform
the slow IO in background, with very little chance of executing some
code that rely on state stored in thread local storage.

Also since most users are on MRI, only the IO can really be parallelized,
so scheduling more code to be executed in background wouldn't lead
to better performance.
This commit is contained in:
Jean Boussier 2020-08-13 09:54:53 +02:00
parent c03933af23
commit 7fc174aada
20 changed files with 388 additions and 35 deletions

@ -86,6 +86,7 @@ module ActiveRecord
autoload :AttributeAssignment
autoload :AttributeMethods
autoload :AutosaveAssociation
autoload :AsynchronousQueriesTracker
autoload :LegacyYamlAdapter
@ -104,6 +105,7 @@ module ActiveRecord
end
autoload :Result
autoload :FutureResult
autoload :TableMetadata
autoload :Type
end

@ -0,0 +1,49 @@
# frozen_string_literal: true
module ActiveRecord
class AsynchronousQueriesTracker # :nodoc:
class Session # :nodoc:
def initialize
@active = true
end
def active?
@active
end
def finalize
@active = false
end
end
class << self
def install_executor_hooks(executor = ActiveSupport::Executor)
executor.register_hook(self)
end
def run
ActiveRecord::Base.asynchronous_queries_tracker.start_session
end
def complete(asynchronous_queries_tracker)
asynchronous_queries_tracker.finalize_session
end
end
attr_reader :current_session
def initialize
@current_session = nil
end
def start_session
@current_session = Session.new
self
end
def finalize_session
@current_session&.finalize
@current_session = nil
end
end
end

@ -407,6 +407,13 @@ def initialize(pool_config)
@lock_thread = false
@async_executor = Concurrent::ThreadPoolExecutor.new(
min_threads: 0,
max_threads: @size,
max_queue: @size * 4,
fallback_policy: :caller_runs
)
@reaper = Reaper.new(self, db_config.reaping_frequency)
@reaper.run
end
@ -713,6 +720,10 @@ def stat
end
end
def schedule_query(future_result) # :nodoc:
@async_executor.post { future_result.execute_or_skip }
end
private
#--
# this is unfortunately not concurrent

@ -59,15 +59,11 @@ def cacheable_query(klass, arel) # :nodoc:
end
# Returns an ActiveRecord::Result instance.
def select_all(arel, name = nil, binds = [], preparable: nil)
def select_all(arel, name = nil, binds = [], preparable: nil, async: false)
arel = arel_from_relation(arel)
sql, binds, preparable = to_sql_and_binds(arel, binds, preparable)
if prepared_statements && preparable
select_prepared(sql, name, binds)
else
select(sql, name, binds)
end
select(sql, name, binds, prepare: prepared_statements && preparable, async: async && FutureResult::SelectAll)
rescue ::RangeError
ActiveRecord::Result.new([], [])
end
@ -528,12 +524,27 @@ def combine_multi_statements(total_sql)
end
# Returns an ActiveRecord::Result instance.
def select(sql, name = nil, binds = [])
exec_query(sql, name, binds, prepare: false)
end
def select(sql, name = nil, binds = [], prepare: false, async: false)
if async
if current_transaction.joinable?
raise AsynchronousQueryInsideTransactionError, "Asynchronous queries are not allowed inside transactions"
end
def select_prepared(sql, name = nil, binds = [])
exec_query(sql, name, binds, prepare: true)
future_result = async.new(
pool,
sql,
name,
binds,
prepare: prepare,
)
if supports_concurrent_connections? && current_transaction.closed? && ActiveRecord::Base.asynchronous_queries_session
future_result.schedule!(ActiveRecord::Base.asynchronous_queries_session)
else
future_result.execute!(self)
end
return future_result
end
exec_query(sql, name, binds, prepare: prepare)
end
def sql_for_insert(sql, pk, binds)

@ -93,7 +93,7 @@ def clear_query_cache
end
end
def select_all(arel, name = nil, binds = [], preparable: nil)
def select_all(arel, name = nil, binds = [], preparable: nil, async: false)
arel = arel_from_relation(arel)
# If arel is locked this is a SELECT ... FOR UPDATE or somesuch.
@ -101,13 +101,29 @@ def select_all(arel, name = nil, binds = [], preparable: nil)
if @query_cache_enabled && !(arel.respond_to?(:locked) && arel.locked)
sql, binds, preparable = to_sql_and_binds(arel, binds, preparable)
cache_sql(sql, name, binds) { super(sql, name, binds, preparable: preparable) }
if async
lookup_sql_cache(sql, name, binds) || super(sql, name, binds, preparable: preparable, async: async)
else
cache_sql(sql, name, binds) { super(sql, name, binds, preparable: preparable, async: async) }
end
else
super
end
end
private
def lookup_sql_cache(sql, name, binds)
@lock.synchronize do
if @query_cache[sql].key?(binds)
ActiveSupport::Notifications.instrument(
"sql.active_record",
cache_notification_info(sql, name, binds)
)
@query_cache[sql][binds]
end
end
end
def cache_sql(sql, name, binds)
@lock.synchronize do
result =

@ -424,6 +424,10 @@ def supports_insert_conflict_target?
false
end
def supports_concurrent_connections?
true
end
# This is meant to be implemented by the adapters that support extensions
def disable_extension(name)
end
@ -689,7 +693,7 @@ def translate_exception_class(e, sql, binds)
exception
end
def log(sql, name = "SQL", binds = [], type_casted_binds = [], statement_name = nil) # :doc:
def log(sql, name = "SQL", binds = [], type_casted_binds = [], statement_name = nil, async: false) # :doc:
@instrumenter.instrument(
"sql.active_record",
sql: sql,
@ -697,6 +701,7 @@ def log(sql, name = "SQL", binds = [], type_casted_binds = [], statement_name =
binds: binds,
type_casted_binds: type_casted_binds,
statement_name: statement_name,
async: async,
connection: self) do
@lock.synchronize do
yield

@ -197,11 +197,11 @@ def clear_cache! # :nodoc:
#++
# Executes the SQL statement in the context of this connection.
def execute(sql, name = nil)
def execute(sql, name = nil, async: false)
materialize_transactions
mark_transaction_written_if_write(sql)
log(sql, name) do
log(sql, name, async: async) do
ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
@connection.query(sql)
end
@ -211,8 +211,8 @@ def execute(sql, name = nil)
# Mysql2Adapter doesn't have to free a result after using it, but we use this method
# to write stuff in an abstract way without concerning ourselves about whether it
# needs to be explicitly freed or not.
def execute_and_free(sql, name = nil) # :nodoc:
yield execute(sql, name)
def execute_and_free(sql, name = nil, async: false) # :nodoc:
yield execute(sql, name, async: async)
end
def begin_db_transaction

@ -38,7 +38,7 @@ def explain(arel, binds = [])
end
# Executes the SQL statement in the context of this connection.
def execute(sql, name = nil)
def execute(sql, name = nil, async: false)
check_if_write_query(sql)
# make sure we carry over any changes to ActiveRecord::Base.default_timezone that have been
@ -48,9 +48,9 @@ def execute(sql, name = nil)
super
end
def exec_query(sql, name = "SQL", binds = [], prepare: false)
def exec_query(sql, name = "SQL", binds = [], prepare: false, async: false)
if without_prepared_statement?(binds)
execute_and_free(sql, name) do |result|
execute_and_free(sql, name, async: async) do |result|
if result
build_result(columns: result.fields, rows: result.to_a)
else
@ -58,7 +58,7 @@ def exec_query(sql, name = "SQL", binds = [], prepare: false)
end
end
else
exec_stmt_and_free(sql, name, binds, cache_stmt: prepare) do |_, result|
exec_stmt_and_free(sql, name, binds, cache_stmt: prepare, async: async) do |_, result|
if result
build_result(columns: result.fields, rows: result.to_a)
else
@ -146,7 +146,7 @@ def max_allowed_packet
@max_allowed_packet ||= show_variable("max_allowed_packet")
end
def exec_stmt_and_free(sql, name, binds, cache_stmt: false)
def exec_stmt_and_free(sql, name, binds, cache_stmt: false, async: false)
check_if_write_query(sql)
materialize_transactions
@ -158,7 +158,7 @@ def exec_stmt_and_free(sql, name, binds, cache_stmt: false)
type_casted_binds = type_casted_binds(binds)
log(sql, name, binds, type_casted_binds) do
log(sql, name, binds, type_casted_binds, async: async) do
if cache_stmt
stmt = @statements[sql] ||= @connection.prepare(sql)
else

@ -47,8 +47,8 @@ def execute(sql, name = nil)
end
end
def exec_query(sql, name = "SQL", binds = [], prepare: false)
execute_and_clear(sql, name, binds, prepare: prepare) do |result|
def exec_query(sql, name = "SQL", binds = [], prepare: false, async: false)
execute_and_clear(sql, name, binds, prepare: prepare, async: async) do |result|
types = {}
fields = result.fields
fields.each_with_index do |fname, i|

@ -648,13 +648,13 @@ def load_types_queries(initializer, oids)
FEATURE_NOT_SUPPORTED = "0A000" #:nodoc:
def execute_and_clear(sql, name, binds, prepare: false)
def execute_and_clear(sql, name, binds, prepare: false, async: false)
check_if_write_query(sql)
if !prepare || without_prepared_statement?(binds)
result = exec_no_cache(sql, name, binds)
result = exec_no_cache(sql, name, binds, async: async)
else
result = exec_cache(sql, name, binds)
result = exec_cache(sql, name, binds, async: async)
end
begin
ret = yield result
@ -664,7 +664,7 @@ def execute_and_clear(sql, name, binds, prepare: false)
ret
end
def exec_no_cache(sql, name, binds)
def exec_no_cache(sql, name, binds, async: false)
materialize_transactions
mark_transaction_written_if_write(sql)
@ -673,14 +673,14 @@ def exec_no_cache(sql, name, binds)
update_typemap_for_default_timezone
type_casted_binds = type_casted_binds(binds)
log(sql, name, binds, type_casted_binds) do
log(sql, name, binds, type_casted_binds, async: async) do
ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
@connection.exec_params(sql, type_casted_binds)
end
end
end
def exec_cache(sql, name, binds)
def exec_cache(sql, name, binds, async: false)
materialize_transactions
mark_transaction_written_if_write(sql)
update_typemap_for_default_timezone
@ -688,7 +688,7 @@ def exec_cache(sql, name, binds)
stmt_key = prepare_statement(sql, binds)
type_casted_binds = type_casted_binds(binds)
log(sql, name, binds, type_casted_binds, stmt_key) do
log(sql, name, binds, type_casted_binds, stmt_key, async: async) do
ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
@connection.exec_prepared(stmt_key, type_casted_binds)
end

@ -31,7 +31,7 @@ def execute(sql, name = nil) #:nodoc:
end
end
def exec_query(sql, name = nil, binds = [], prepare: false)
def exec_query(sql, name = nil, binds = [], prepare: false, async: false)
check_if_write_query(sql)
materialize_transactions
@ -39,7 +39,7 @@ def exec_query(sql, name = nil, binds = [], prepare: false)
type_casted_binds = type_casted_binds(binds)
log(sql, name, binds, type_casted_binds) do
log(sql, name, binds, type_casted_binds, async: async) do
ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
# Don't cache statements if they are not prepared
unless prepare

@ -84,6 +84,7 @@ def dealloc(stmt)
end
def initialize(connection, logger, connection_options, config)
@memory_database = config[:database] == ":memory:"
super(connection, logger, config)
configure_connection
end
@ -153,6 +154,10 @@ def supports_insert_on_conflict?
alias supports_insert_on_duplicate_update? supports_insert_on_conflict?
alias supports_insert_conflict_target? supports_insert_on_conflict?
def supports_concurrent_connections?
!@memory_database
end
def active?
!@connection.closed?
end

@ -194,6 +194,15 @@ def self.connection_handlers=(handlers)
@@connection_handlers = handlers
end
def self.asynchronous_queries_session # :nodoc:
asynchronous_queries_tracker.current_session
end
def self.asynchronous_queries_tracker # :nodoc:
Thread.current.thread_variable_get(:ar_asynchronous_queries_tracker) ||
Thread.current.thread_variable_set(:ar_asynchronous_queries_tracker, AsynchronousQueriesTracker.new)
end
# Returns the symbol representing the current connected role.
#
# ActiveRecord::Base.connected_to(role: :writing) do

@ -373,6 +373,11 @@ class TransactionIsolationError < ActiveRecordError
class TransactionRollbackError < StatementInvalid
end
# AsynchronousQueryInsideTransactionError will be raised when attempting
# to perform an aynchronous query from inside a transaction
class AsynchronousQueryInsideTransactionError < ActiveRecordError
end
# SerializationFailure will be raised when a transaction is rolled
# back by the database due to a serialization failure.
class SerializationFailure < TransactionRollbackError

@ -0,0 +1,97 @@
# frozen_string_literal: true
module ActiveRecord
class FutureResult # :nodoc:
Canceled = Class.new(ActiveRecordError)
delegate :empty?, :to_a, to: :result
def initialize(pool, *args, **kwargs)
@mutex = Mutex.new
@session = nil
@pool = pool
@args = args
@kwargs = kwargs
@pending = true
@error = nil
@result = nil
end
def schedule!(session)
@session = session
@pool.schedule_query(self)
end
def execute!(connection)
execute_query(connection)
end
def execute_or_skip
return unless pending?
@pool.with_connection do |connection|
return unless @mutex.try_lock
begin
if pending?
execute_query(connection, async: true)
end
ensure
@mutex.unlock
end
end
end
def result
execute_or_wait
if @error
raise @error
elsif canceled?
raise Canceled
else
@result
end
end
private
def pending?
@pending && (!@session || @session.active?)
end
def canceled?
@session && !@session.active?
end
def execute_or_wait
return unless pending?
@mutex.synchronize do
if pending?
execute_query(@pool.connection)
end
end
end
def execute_query(connection, async: false)
@result = exec_query(connection, *@args, **@kwargs, async: async)
rescue => error
@error = error
ensure
@pending = false
end
def exec_query(connection, *args, **kwargs)
connection.exec_query(*args, **kwargs)
end
class SelectAll < FutureResult # :nodoc:
private
def exec_query(*, **)
super
rescue ::RangeError
ActiveRecord::Result.new([], [])
end
end
end
end

@ -39,6 +39,7 @@ def sql(event)
name = "#{payload[:name]} (#{event.duration.round(1)}ms)"
name = "CACHE #{name}" if payload[:cached]
name = "ASYNC #{name}" if payload[:async]
sql = payload[:sql]
binds = nil

@ -240,6 +240,7 @@ class Railtie < Rails::Railtie # :nodoc:
initializer "active_record.set_executor_hooks" do
ActiveRecord::QueryCache.install_executor_hooks
ActiveRecord::AsynchronousQueriesTracker.install_executor_hooks
end
initializer "active_record.add_watchable_files" do |app|

@ -91,6 +91,10 @@ def last(n = nil)
n ? hash_rows.last(n) : hash_rows.last
end
def result # :nodoc:
self
end
def cast_values(type_overrides = {}) # :nodoc:
if columns.one?
# Separated to avoid allocating an array per row

@ -343,6 +343,120 @@ def test_in_clause_length_is_deprecated
end
end
module AsynchronousQueriesSharedTests
def test_async_select_failure
ActiveRecord::Base.asynchronous_queries_tracker.start_session
future_result = @connection.select_all "SELECT * FROM does_not_exists", async: true
assert_kind_of ActiveRecord::FutureResult, future_result
assert_raises ActiveRecord::StatementInvalid do
future_result.result
end
ensure
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session
end
def test_async_query_from_transaction
ActiveRecord::Base.asynchronous_queries_tracker.start_session
assert_nothing_raised do
@connection.select_all "SELECT * FROM posts", async: true
end
@connection.transaction do
assert_raises AsynchronousQueryInsideTransactionError do
@connection.select_all "SELECT * FROM posts", async: true
end
end
ensure
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session
end
def test_async_query_cache
ActiveRecord::Base.asynchronous_queries_tracker.start_session
@connection.enable_query_cache!
@connection.select_all "SELECT * FROM posts"
result = @connection.select_all "SELECT * FROM posts", async: true
assert_equal Result, result.class
ensure
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session
@connection.disable_query_cache!
end
def test_async_query_outside_session
status = {}
subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event|
if event.payload[:sql] == "SELECT * FROM does_not_exists"
status[:executed] = true
status[:async] = event.payload[:async]
end
end
future_result = @connection.select_all "SELECT * FROM does_not_exists", async: true
assert_kind_of ActiveRecord::FutureResult, future_result
assert_raises ActiveRecord::StatementInvalid do
future_result.result
end
assert_equal true, status[:executed]
assert_equal false, status[:async]
ensure
ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber
end
end
class AsynchronousQueriesTest < ActiveRecord::TestCase
self.use_transactional_tests = false
include AsynchronousQueriesSharedTests
def setup
@connection = ActiveRecord::Base.connection
end
def test_async_select_all
ActiveRecord::Base.asynchronous_queries_tracker.start_session
status = {}
monitor = Monitor.new
condition = monitor.new_cond
subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event|
if event.payload[:sql] == "SELECT * FROM posts"
status[:executed] = true
status[:async] = event.payload[:async]
monitor.synchronize { condition.signal }
end
end
future_result = @connection.select_all "SELECT * FROM posts", async: true
assert_kind_of ActiveRecord::FutureResult, future_result
monitor.synchronize do
condition.wait_until { status[:executed] }
end
assert_kind_of ActiveRecord::Result, future_result.result
assert_equal @connection.supports_concurrent_connections?, status[:async]
ensure
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session
ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber
end
end
class AsynchronousQueriesWithTransactionalTest < ActiveRecord::TestCase
self.use_transactional_tests = true
include AsynchronousQueriesSharedTests
def setup
@connection = ActiveRecord::Base.connection
@connection.materialize_transactions
end
end
class AdapterForeignKeyTest < ActiveRecord::TestCase
self.use_transactional_tests = false

@ -74,6 +74,28 @@ def test_connections_not_closed_if_exception_inside_transaction
end
end
test "cancel asynchronous queries if an exception is raised" do
unless ActiveRecord::Base.connection.supports_concurrent_connections?
skip "This adapter doesn't support asynchronous queries"
end
app = Class.new(App) do
attr_reader :future_result
def call(env)
@future_result = ActiveRecord::Base.connection.select_all("SELECT * FROM does_not_exists", async: true)
raise NotImplementedError
end
end.new
explosive = middleware(app)
assert_raises(NotImplementedError) { explosive.call(@env) }
assert_raises FutureResult::Canceled do
app.future_result.to_a
end
end
test "doesn't clear active connections when running in a test case" do
executor.wrap do
@management.call(@env)
@ -100,6 +122,7 @@ def test_connections_not_closed_if_exception_inside_transaction
def executor
@executor ||= Class.new(ActiveSupport::Executor).tap do |exe|
ActiveRecord::QueryCache.install_executor_hooks(exe)
ActiveRecord::AsynchronousQueriesTracker.install_executor_hooks(exe)
end
end