rails/activejob/lib/active_job/enqueuing.rb

128 lines
4.3 KiB
Ruby
Raw Normal View History

2017-07-09 17:49:52 +00:00
# frozen_string_literal: true
2014-05-19 10:06:09 +00:00
module ActiveJob
# Provides behavior for enqueuing jobs.
Communicate enqueue failures to callers of perform_later There is presently no clean way of telling a caller of `perform_later` the reason why a job failed to enqueue. When the job is enqueued successfully, the job object itself is returned, but when the job can not be enqueued, only `false` is returned. This does not allow callers to distinguish between classes of failures. One important class of failures is when the job backend experiences a network partition when communicating with its underlying datastore. It is entirely possible for that network partition to recover and as such, code attempting to enqueue a job may wish to take action to reenqueue that job after a brief delay. This is distinguished from the class of failures where due a business rule defined in a callback in the application, a job fails to enqueue and should not be retried. This PR changes the following: - Allows a block to be passed to the `perform_later` method. After the `enqueue` method is executed, but before the result is returned, the job will be yielded to the block. This allows the code invoking the `perform_later` method to inspect the job object, even in failure scenarios. - Adds an exception `EnqueueError` which job adapters can raise if they detect a problem specific to their underlying implementation or infrastructure during the enqueue process. - Adds two properties to the job base class: `successfully_enqueued` and `enqueue_error`. `enqueue_error` will be populated by the `enqueue` method if it rescues an `EnqueueError` raised by the job backend. `successfully_enqueued` will be true if the job is not rejected by callbacks and does not cause the job backend to raise an `EnqueueError` and will be `false` otherwise. This will allow developers to do something like the following: MyJob.perform_later do |job| unless job.successfully_enqueued? if job.enqueue_error&.message == "Redis was unavailable" # invoke some code that will retry the job after a delay end end end
2021-01-19 14:57:46 +00:00
# Can be raised by adapters if they wish to communicate to the caller a reason
# why the adapter was unexpectedly unable to enqueue a job.
class EnqueueError < StandardError; end
class << self
# Push many jobs onto the queue at once without running enqueue callbacks.
# Queue adapters may communicate the enqueue status of each job by setting
# successfully_enqueued and/or enqueue_error on the passed-in job instances.
def perform_all_later(*jobs)
jobs.flatten!
jobs.group_by(&:queue_adapter).each do |queue_adapter, adapter_jobs|
instrument_enqueue_all(queue_adapter, adapter_jobs) do
if queue_adapter.respond_to?(:enqueue_all)
queue_adapter.enqueue_all(adapter_jobs)
else
adapter_jobs.each do |job|
job.successfully_enqueued = false
if job.scheduled_at
queue_adapter.enqueue_at(job, job.scheduled_at.to_f)
else
queue_adapter.enqueue(job)
end
job.successfully_enqueued = true
rescue EnqueueError => e
job.enqueue_error = e
end
adapter_jobs.count(&:successfully_enqueued?)
end
end
end
nil
end
end
2014-05-19 10:06:09 +00:00
module Enqueuing
extend ActiveSupport::Concern
2014-05-30 23:19:30 +00:00
# Includes the +perform_later+ method for job initialization.
module ClassMethods
# Push a job onto the queue. By default the arguments must be either String,
# Integer, Float, NilClass, TrueClass, FalseClass, BigDecimal, Symbol, Date,
# Time, DateTime, ActiveSupport::TimeWithZone, ActiveSupport::Duration,
2022-02-18 23:16:04 +00:00
# Hash, ActiveSupport::HashWithIndifferentAccess, Array, Range, or
# GlobalID::Identification instances, although this can be extended by adding
# custom serializers.
#
# Returns an instance of the job class queued with arguments available in
# Job#arguments or +false+ if the enqueue did not succeed.
Communicate enqueue failures to callers of perform_later There is presently no clean way of telling a caller of `perform_later` the reason why a job failed to enqueue. When the job is enqueued successfully, the job object itself is returned, but when the job can not be enqueued, only `false` is returned. This does not allow callers to distinguish between classes of failures. One important class of failures is when the job backend experiences a network partition when communicating with its underlying datastore. It is entirely possible for that network partition to recover and as such, code attempting to enqueue a job may wish to take action to reenqueue that job after a brief delay. This is distinguished from the class of failures where due a business rule defined in a callback in the application, a job fails to enqueue and should not be retried. This PR changes the following: - Allows a block to be passed to the `perform_later` method. After the `enqueue` method is executed, but before the result is returned, the job will be yielded to the block. This allows the code invoking the `perform_later` method to inspect the job object, even in failure scenarios. - Adds an exception `EnqueueError` which job adapters can raise if they detect a problem specific to their underlying implementation or infrastructure during the enqueue process. - Adds two properties to the job base class: `successfully_enqueued` and `enqueue_error`. `enqueue_error` will be populated by the `enqueue` method if it rescues an `EnqueueError` raised by the job backend. `successfully_enqueued` will be true if the job is not rejected by callbacks and does not cause the job backend to raise an `EnqueueError` and will be `false` otherwise. This will allow developers to do something like the following: MyJob.perform_later do |job| unless job.successfully_enqueued? if job.enqueue_error&.message == "Redis was unavailable" # invoke some code that will retry the job after a delay end end end
2021-01-19 14:57:46 +00:00
#
# After the attempted enqueue, the job will be yielded to an optional block.
#
# If Active Job is used conjointly with Active Record, and #perform_later is called
# inside an Active Record transaction, then the enqueue is implicitly deferred to after
# the transaction is committed, or dropped if it's rolled back. In such case #perform_later
# will return the job instance like if it was successfully enqueued, but will still return
# +false+ if a callback prevented the job from being enqueued.
#
# This behavior can be changed on a per job basis:
#
# class NotificationJob < ApplicationJob
# self.enqueue_after_transaction_commit = false
# end
def perform_later(...)
job = job_or_instantiate(...)
Communicate enqueue failures to callers of perform_later There is presently no clean way of telling a caller of `perform_later` the reason why a job failed to enqueue. When the job is enqueued successfully, the job object itself is returned, but when the job can not be enqueued, only `false` is returned. This does not allow callers to distinguish between classes of failures. One important class of failures is when the job backend experiences a network partition when communicating with its underlying datastore. It is entirely possible for that network partition to recover and as such, code attempting to enqueue a job may wish to take action to reenqueue that job after a brief delay. This is distinguished from the class of failures where due a business rule defined in a callback in the application, a job fails to enqueue and should not be retried. This PR changes the following: - Allows a block to be passed to the `perform_later` method. After the `enqueue` method is executed, but before the result is returned, the job will be yielded to the block. This allows the code invoking the `perform_later` method to inspect the job object, even in failure scenarios. - Adds an exception `EnqueueError` which job adapters can raise if they detect a problem specific to their underlying implementation or infrastructure during the enqueue process. - Adds two properties to the job base class: `successfully_enqueued` and `enqueue_error`. `enqueue_error` will be populated by the `enqueue` method if it rescues an `EnqueueError` raised by the job backend. `successfully_enqueued` will be true if the job is not rejected by callbacks and does not cause the job backend to raise an `EnqueueError` and will be `false` otherwise. This will allow developers to do something like the following: MyJob.perform_later do |job| unless job.successfully_enqueued? if job.enqueue_error&.message == "Redis was unavailable" # invoke some code that will retry the job after a delay end end end
2021-01-19 14:57:46 +00:00
enqueue_result = job.enqueue
yield job if block_given?
enqueue_result
end
private
def job_or_instantiate(*args, &_) # :doc:
2014-08-25 14:34:50 +00:00
args.first.is_a?(self) ? args.first : new(*args)
end
ruby2_keywords(:job_or_instantiate)
end
2014-05-30 23:19:30 +00:00
# Enqueues the job to be performed by the queue adapter.
2014-08-25 14:34:50 +00:00
#
# ==== Options
# * <tt>:wait</tt> - Enqueues the job with the specified delay
# * <tt>:wait_until</tt> - Enqueues the job at the time specified
2014-08-25 14:34:50 +00:00
# * <tt>:queue</tt> - Enqueues the job on the specified queue
2015-03-18 09:48:26 +00:00
# * <tt>:priority</tt> - Enqueues the job with the specified priority
2014-08-25 14:34:50 +00:00
#
# ==== Examples
#
# my_job_instance.enqueue
# my_job_instance.enqueue wait: 5.minutes
2014-08-25 14:34:50 +00:00
# my_job_instance.enqueue queue: :important
# my_job_instance.enqueue wait_until: Date.tomorrow.midnight
2015-03-18 09:48:26 +00:00
# my_job_instance.enqueue priority: 10
def enqueue(options = {})
set(options)
Communicate enqueue failures to callers of perform_later There is presently no clean way of telling a caller of `perform_later` the reason why a job failed to enqueue. When the job is enqueued successfully, the job object itself is returned, but when the job can not be enqueued, only `false` is returned. This does not allow callers to distinguish between classes of failures. One important class of failures is when the job backend experiences a network partition when communicating with its underlying datastore. It is entirely possible for that network partition to recover and as such, code attempting to enqueue a job may wish to take action to reenqueue that job after a brief delay. This is distinguished from the class of failures where due a business rule defined in a callback in the application, a job fails to enqueue and should not be retried. This PR changes the following: - Allows a block to be passed to the `perform_later` method. After the `enqueue` method is executed, but before the result is returned, the job will be yielded to the block. This allows the code invoking the `perform_later` method to inspect the job object, even in failure scenarios. - Adds an exception `EnqueueError` which job adapters can raise if they detect a problem specific to their underlying implementation or infrastructure during the enqueue process. - Adds two properties to the job base class: `successfully_enqueued` and `enqueue_error`. `enqueue_error` will be populated by the `enqueue` method if it rescues an `EnqueueError` raised by the job backend. `successfully_enqueued` will be true if the job is not rejected by callbacks and does not cause the job backend to raise an `EnqueueError` and will be `false` otherwise. This will allow developers to do something like the following: MyJob.perform_later do |job| unless job.successfully_enqueued? if job.enqueue_error&.message == "Redis was unavailable" # invoke some code that will retry the job after a delay end end end
2021-01-19 14:57:46 +00:00
self.successfully_enqueued = false
2014-08-25 14:34:50 +00:00
run_callbacks :enqueue do
raw_enqueue
end
if successfully_enqueued?
self
else
false
end
end
private
def raw_enqueue
if scheduled_at
queue_adapter.enqueue_at self, scheduled_at.to_f
2014-08-25 14:34:50 +00:00
else
queue_adapter.enqueue self
2014-08-25 14:34:50 +00:00
end
Communicate enqueue failures to callers of perform_later There is presently no clean way of telling a caller of `perform_later` the reason why a job failed to enqueue. When the job is enqueued successfully, the job object itself is returned, but when the job can not be enqueued, only `false` is returned. This does not allow callers to distinguish between classes of failures. One important class of failures is when the job backend experiences a network partition when communicating with its underlying datastore. It is entirely possible for that network partition to recover and as such, code attempting to enqueue a job may wish to take action to reenqueue that job after a brief delay. This is distinguished from the class of failures where due a business rule defined in a callback in the application, a job fails to enqueue and should not be retried. This PR changes the following: - Allows a block to be passed to the `perform_later` method. After the `enqueue` method is executed, but before the result is returned, the job will be yielded to the block. This allows the code invoking the `perform_later` method to inspect the job object, even in failure scenarios. - Adds an exception `EnqueueError` which job adapters can raise if they detect a problem specific to their underlying implementation or infrastructure during the enqueue process. - Adds two properties to the job base class: `successfully_enqueued` and `enqueue_error`. `enqueue_error` will be populated by the `enqueue` method if it rescues an `EnqueueError` raised by the job backend. `successfully_enqueued` will be true if the job is not rejected by callbacks and does not cause the job backend to raise an `EnqueueError` and will be `false` otherwise. This will allow developers to do something like the following: MyJob.perform_later do |job| unless job.successfully_enqueued? if job.enqueue_error&.message == "Redis was unavailable" # invoke some code that will retry the job after a delay end end end
2021-01-19 14:57:46 +00:00
self.successfully_enqueued = true
rescue EnqueueError => e
self.enqueue_error = e
end
2014-05-19 10:06:09 +00:00
end
2014-05-19 20:13:40 +00:00
end