rails/activejob/lib/active_job/instrumentation.rb
Sander Verdonschot 9b62f88a2f
Add perform_all_later to enqueue multiple jobs at once
Sidekiq has a useful optimisation called `push_bulk` that enqueues many jobs at
once, eliminating the repeated Redis roundtrips. However, this feature is not
exposed through Active Job, so it only works for `Sidekiq::Worker` jobs. This
adds a barrier to Active Job adoption for apps that rely on this feature. It
also makes it harder for other queue adapters to implement similar
functionality, as they then have to take care of serialization, callbacks, etc.
themselves.

This commit adds `ActiveJob.perform_all_later(<job1>, <job2>)`, backed by
Sidekiq's `push_bulk` and with a fallback to enqueuing serially if the queue
adapter does not support bulk enqueue.

The performance benefit for 1000 jobs can be more than an order of magnitude:

| Enqueue type       | Serial time (ms) | Bulk time (ms) | Speedup |
| ------------------ | ---------------- | -------------- | ------- |
| Raw Sidekiq        |             2661 |            119 |     22x |
| Active Job Sidekiq |             2853 |            208 |     14x |

(Measured in a simple test app in our production environment.)

Instrumentation for perform_all_later uses a new event `enqueue_all.active_job`
2023-02-02 16:39:22 -05:00

53 lines
1.3 KiB
Ruby

# frozen_string_literal: true
module ActiveJob
class << self
private
def instrument_enqueue_all(queue_adapter, jobs)
payload = { adapter: queue_adapter, jobs: jobs }
ActiveSupport::Notifications.instrument("enqueue_all.active_job", payload) do
result = yield payload
payload[:enqueued_count] = result
result
end
end
end
module Instrumentation # :nodoc:
extend ActiveSupport::Concern
included do
around_enqueue do |_, block|
scheduled_at ? instrument(:enqueue_at, &block) : instrument(:enqueue, &block)
end
end
def perform_now
instrument(:perform) { super }
end
private
def _perform_job
instrument(:perform_start)
super
end
def instrument(operation, payload = {}, &block)
payload[:job] = self
payload[:adapter] = queue_adapter
ActiveSupport::Notifications.instrument("#{operation}.active_job", payload) do
value = block.call if block
payload[:aborted] = @_halted_callback_hook_called if defined?(@_halted_callback_hook_called)
@_halted_callback_hook_called = nil
value
end
end
def halted_callback_hook(*)
super
@_halted_callback_hook_called = true
end
end
end