e41d1acc91
We recently made improvements to the name extraction for `MyJob.queue_adapter_name` (see https://github.com/rails/rails/pull/47995 and https://github.com/rails/rails/pull/48003). I had overlooked that we also extract the queue adapter name in a similar (but not identical) way for log messages, so this change aligns the two.
212 lines
6.5 KiB
Ruby
212 lines
6.5 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
require "active_support/core_ext/string/filters"
|
|
require "active_support/log_subscriber"
|
|
|
|
module ActiveJob
|
|
class LogSubscriber < ActiveSupport::LogSubscriber # :nodoc:
|
|
class_attribute :backtrace_cleaner, default: ActiveSupport::BacktraceCleaner.new
|
|
|
|
def enqueue(event)
|
|
job = event.payload[:job]
|
|
ex = event.payload[:exception_object]
|
|
|
|
if ex
|
|
error do
|
|
"Failed enqueuing #{job.class.name} to #{queue_name(event)}: #{ex.class} (#{ex.message})"
|
|
end
|
|
elsif event.payload[:aborted]
|
|
info do
|
|
"Failed enqueuing #{job.class.name} to #{queue_name(event)}, a before_enqueue callback halted the enqueuing execution."
|
|
end
|
|
else
|
|
info do
|
|
"Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)}" + args_info(job)
|
|
end
|
|
end
|
|
end
|
|
subscribe_log_level :enqueue, :info
|
|
|
|
def enqueue_at(event)
|
|
job = event.payload[:job]
|
|
ex = event.payload[:exception_object]
|
|
|
|
if ex
|
|
error do
|
|
"Failed enqueuing #{job.class.name} to #{queue_name(event)}: #{ex.class} (#{ex.message})"
|
|
end
|
|
elsif event.payload[:aborted]
|
|
info do
|
|
"Failed enqueuing #{job.class.name} to #{queue_name(event)}, a before_enqueue callback halted the enqueuing execution."
|
|
end
|
|
else
|
|
info do
|
|
"Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)} at #{scheduled_at(event)}" + args_info(job)
|
|
end
|
|
end
|
|
end
|
|
subscribe_log_level :enqueue_at, :info
|
|
|
|
def enqueue_all(event)
|
|
info do
|
|
jobs = event.payload[:jobs]
|
|
adapter = event.payload[:adapter]
|
|
enqueued_count = event.payload[:enqueued_count]
|
|
|
|
if enqueued_count == jobs.size
|
|
enqueued_jobs_message(adapter, jobs)
|
|
elsif jobs.any?(&:successfully_enqueued?)
|
|
enqueued_jobs = jobs.select(&:successfully_enqueued?)
|
|
|
|
failed_enqueue_count = jobs.size - enqueued_count
|
|
if failed_enqueue_count == 0
|
|
enqueued_jobs_message(adapter, enqueued_jobs)
|
|
else
|
|
"#{enqueued_jobs_message(adapter, enqueued_jobs)}. "\
|
|
"Failed enqueuing #{failed_enqueue_count} #{'job'.pluralize(failed_enqueue_count)}"
|
|
end
|
|
else
|
|
failed_enqueue_count = jobs.size - enqueued_count
|
|
"Failed enqueuing #{failed_enqueue_count} #{'job'.pluralize(failed_enqueue_count)} "\
|
|
"to #{ActiveJob.adapter_name(adapter)}"
|
|
end
|
|
end
|
|
end
|
|
subscribe_log_level :enqueue_all, :info
|
|
|
|
def perform_start(event)
|
|
info do
|
|
job = event.payload[:job]
|
|
"Performing #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} enqueued at #{job.enqueued_at}" + args_info(job)
|
|
end
|
|
end
|
|
subscribe_log_level :perform_start, :info
|
|
|
|
def perform(event)
|
|
job = event.payload[:job]
|
|
ex = event.payload[:exception_object]
|
|
if ex
|
|
error do
|
|
"Error performing #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} in #{event.duration.round(2)}ms: #{ex.class} (#{ex.message}):\n" + Array(ex.backtrace).join("\n")
|
|
end
|
|
elsif event.payload[:aborted]
|
|
error do
|
|
"Error performing #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} in #{event.duration.round(2)}ms: a before_perform callback halted the job execution"
|
|
end
|
|
else
|
|
info do
|
|
"Performed #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} in #{event.duration.round(2)}ms"
|
|
end
|
|
end
|
|
end
|
|
subscribe_log_level :perform, :info
|
|
|
|
def enqueue_retry(event)
|
|
job = event.payload[:job]
|
|
ex = event.payload[:error]
|
|
wait = event.payload[:wait]
|
|
|
|
info do
|
|
if ex
|
|
"Retrying #{job.class} (Job ID: #{job.job_id}) after #{job.executions} attempts in #{wait.to_i} seconds, due to a #{ex.class} (#{ex.message})."
|
|
else
|
|
"Retrying #{job.class} (Job ID: #{job.job_id}) after #{job.executions} attempts in #{wait.to_i} seconds."
|
|
end
|
|
end
|
|
end
|
|
subscribe_log_level :enqueue_retry, :info
|
|
|
|
def retry_stopped(event)
|
|
job = event.payload[:job]
|
|
ex = event.payload[:error]
|
|
|
|
error do
|
|
"Stopped retrying #{job.class} (Job ID: #{job.job_id}) due to a #{ex.class} (#{ex.message}), which reoccurred on #{job.executions} attempts."
|
|
end
|
|
end
|
|
subscribe_log_level :enqueue_retry, :error
|
|
|
|
def discard(event)
|
|
job = event.payload[:job]
|
|
ex = event.payload[:error]
|
|
|
|
error do
|
|
"Discarded #{job.class} (Job ID: #{job.job_id}) due to a #{ex.class} (#{ex.message})."
|
|
end
|
|
end
|
|
subscribe_log_level :discard, :error
|
|
|
|
private
|
|
def queue_name(event)
|
|
ActiveJob.adapter_name(event.payload[:adapter]) + "(#{event.payload[:job].queue_name})"
|
|
end
|
|
|
|
def args_info(job)
|
|
if job.class.log_arguments? && job.arguments.any?
|
|
" with arguments: " +
|
|
job.arguments.map { |arg| format(arg).inspect }.join(", ")
|
|
else
|
|
""
|
|
end
|
|
end
|
|
|
|
def format(arg)
|
|
case arg
|
|
when Hash
|
|
arg.transform_values { |value| format(value) }
|
|
when Array
|
|
arg.map { |value| format(value) }
|
|
when GlobalID::Identification
|
|
arg.to_global_id rescue arg
|
|
else
|
|
arg
|
|
end
|
|
end
|
|
|
|
def scheduled_at(event)
|
|
Time.at(event.payload[:job].scheduled_at).utc
|
|
end
|
|
|
|
def logger
|
|
ActiveJob::Base.logger
|
|
end
|
|
|
|
def info(progname = nil, &block)
|
|
return unless super
|
|
|
|
if ActiveJob.verbose_enqueue_logs
|
|
log_enqueue_source
|
|
end
|
|
end
|
|
|
|
def error(progname = nil, &block)
|
|
return unless super
|
|
|
|
if ActiveJob.verbose_enqueue_logs
|
|
log_enqueue_source
|
|
end
|
|
end
|
|
|
|
def log_enqueue_source
|
|
source = extract_enqueue_source_location(caller)
|
|
|
|
if source
|
|
logger.info("↳ #{source}")
|
|
end
|
|
end
|
|
|
|
def extract_enqueue_source_location(locations)
|
|
backtrace_cleaner.clean(locations.lazy).first
|
|
end
|
|
|
|
def enqueued_jobs_message(adapter, enqueued_jobs)
|
|
enqueued_count = enqueued_jobs.size
|
|
job_classes_counts = enqueued_jobs.map(&:class).tally.sort_by { |_k, v| -v }
|
|
"Enqueued #{enqueued_count} #{'job'.pluralize(enqueued_count)} to #{ActiveJob.adapter_name(adapter)}"\
|
|
" (#{job_classes_counts.map { |klass, count| "#{count} #{klass}" }.join(', ')})"
|
|
end
|
|
end
|
|
end
|
|
|
|
ActiveJob::LogSubscriber.attach_to :active_job
|