Add perform_all_later to enqueue multiple jobs at once

Sidekiq has a useful optimisation called `push_bulk` that enqueues many jobs at
once, eliminating the repeated Redis roundtrips. However, this feature is not
exposed through Active Job, so it only works for `Sidekiq::Worker` jobs. This
adds a barrier to Active Job adoption for apps that rely on this feature. It
also makes it harder for other queue adapters to implement similar
functionality, as they then have to take care of serialization, callbacks, etc.
themselves.

This commit adds `ActiveJob.perform_all_later(<job1>, <job2>)`, backed by
Sidekiq's `push_bulk` and with a fallback to enqueuing serially if the queue
adapter does not support bulk enqueue.

The performance benefit for 1000 jobs can be more than an order of magnitude:

| Enqueue type       | Serial time (ms) | Bulk time (ms) | Speedup |
| ------------------ | ---------------- | -------------- | ------- |
| Raw Sidekiq        |             2661 |            119 |     22x |
| Active Job Sidekiq |             2853 |            208 |     14x |

(Measured in a simple test app in our production environment.)

Instrumentation for perform_all_later uses a new event `enqueue_all.active_job`
This commit is contained in:
Sander Verdonschot 2022-11-02 16:53:10 -04:00
parent 7eb76dabbb
commit 9b62f88a2f
No known key found for this signature in database
GPG Key ID: 447E1882AA7DF0CD
7 changed files with 120 additions and 0 deletions

@ -1,3 +1,25 @@
* Add `perform_all_later` to enqueue multiple jobs at once
This adds the ability to bulk enqueue jobs, without running callbacks, by
passing multiple jobs or an array of jobs. For example:
```ruby
ActiveJob.perform_all_later(MyJob.new("hello", 42), MyJob.new("world", 0))
user_jobs = User.pluck(:id).map { |id| UserJob.new(user_id: id) }
ActiveJob.perform_all_later(user_jobs)
```
This can greatly reduce the number of round-trips to the queue datastore.
For queue adapters that do not implement the new `enqueue_all` method, we
fall back to enqueuing jobs indvidually. The Sidekiq adapter implements
`enqueue_all` with `push_bulk`.
This method does not use the existing `enqueue.active_job` event, but adds a
new event `enqueue_all.active_job`.
*Sander Verdonschot*
* Don't double log the `job` when using `ActiveRecord::QueryLog`
Previously if you set `config.active_record.query_log_tags` to an array that included

@ -14,5 +14,9 @@ def perform_now(...)
def perform_later(...)
@job_class.new(...).enqueue @options
end
def perform_all_later(multi_args)
@job_class.perform_all_later(multi_args, options: @options)
end
end
end

@ -9,6 +9,35 @@ module ActiveJob
# why the adapter was unexpectedly unable to enqueue a job.
class EnqueueError < StandardError; end
class << self
# Push many jobs onto the queue at once without running enqueue callbacks.
# Queue adapters may communicate the enqueue status of each job by setting
# successfully_enqueued and/or enqueue_error on the passed-in job instances.
def perform_all_later(*jobs)
jobs.flatten!
jobs.group_by(&:queue_adapter).each do |queue_adapter, adapter_jobs|
instrument_enqueue_all(queue_adapter, adapter_jobs) do
if queue_adapter.respond_to?(:enqueue_all)
queue_adapter.enqueue_all(adapter_jobs)
else
adapter_jobs.each do |job|
job.successfully_enqueued = false
if job.scheduled_at
queue_adapter.enqueue_at(job, job.scheduled_at)
else
queue_adapter.enqueue(job)
end
job.successfully_enqueued = true
rescue EnqueueError => e
job.enqueue_error = e
end
end
end
end
nil
end
end
module Enqueuing
extend ActiveSupport::Concern

@ -1,6 +1,18 @@
# frozen_string_literal: true
module ActiveJob
class << self
private
def instrument_enqueue_all(queue_adapter, jobs)
payload = { adapter: queue_adapter, jobs: jobs }
ActiveSupport::Notifications.instrument("enqueue_all.active_job", payload) do
result = yield payload
payload[:enqueued_count] = result
result
end
end
end
module Instrumentation # :nodoc:
extend ActiveSupport::Concern

@ -32,6 +32,33 @@ def enqueue_at(job, timestamp) # :nodoc:
).perform_at(timestamp, job.serialize)
end
def enqueue_all(jobs) # :nodoc:
jobs.group_by(&:class).each do |job_class, same_class_jobs|
same_class_jobs.group_by(&:queue_name).each do |queue, same_class_and_queue_jobs|
immediate_jobs, scheduled_jobs = same_class_and_queue_jobs.partition { |job| job.scheduled_at.nil? }
if immediate_jobs.any?
Sidekiq::Client.push_bulk(
"class" => JobWrapper,
"wrapped" => job_class,
"queue" => queue,
"args" => immediate_jobs.map { |job| [job.serialize] },
)
end
if scheduled_jobs.any?
Sidekiq::Client.push_bulk(
"class" => JobWrapper,
"wrapped" => job_class,
"queue" => queue,
"args" => scheduled_jobs.map { |job| [job.serialize] },
"at" => scheduled_jobs.map { |job| job.scheduled_at }
)
end
end
end
end
class JobWrapper # :nodoc:
include Sidekiq::Worker

@ -3,6 +3,7 @@
require "helper"
require "jobs/hello_job"
require "jobs/enqueue_error_job"
require "jobs/multiple_kwargs_job"
require "active_support/core_ext/numeric/time"
class QueuingTest < ActiveSupport::TestCase
@ -54,4 +55,19 @@ class QueuingTest < ActiveSupport::TestCase
assert_equal ActiveJob::EnqueueError, job.enqueue_error.class
end
end
test "run multiple queued jobs" do
ActiveJob.perform_all_later(HelloJob.new("Jamie"), HelloJob.new("John"))
assert_equal ["Jamie says hello", "John says hello"], JobBuffer.values.sort
end
test "run multiple queued jobs passed as array" do
ActiveJob.perform_all_later([HelloJob.new("Jamie"), HelloJob.new("John")])
assert_equal ["Jamie says hello", "John says hello"], JobBuffer.values.sort
end
test "run multiple queued jobs of different classes" do
ActiveJob.perform_all_later([HelloJob.new("Jamie"), MultipleKwargsJob.new(argument1: "John", argument2: 42)])
assert_equal ["Jamie says hello", "Job with argument1: John, argument2: 42"], JobBuffer.values.sort
end
end

@ -77,6 +77,16 @@ class QueuingTest < ActiveSupport::TestCase
skip
end
test "should run job bulk enqueued in the future at the specified time" do
ActiveJob.perform_all_later([TestJob.new(@id).set(wait: 5.seconds)])
wait_for_jobs_to_finish_for(2.seconds)
assert_job_not_executed
wait_for_jobs_to_finish_for(10.seconds)
assert_job_executed
rescue NotImplementedError
skip
end
test "should supply a provider_job_id when available for immediate jobs" do
skip unless adapter_is?(:async, :delayed_job, :sidekiq, :queue_classic)
test_job = TestJob.perform_later @id