From 0d3aec496955f38c21bf8042d2510d5b3ade8a00 Mon Sep 17 00:00:00 2001 From: Edouard CHIN Date: Thu, 28 Nov 2019 23:40:24 +0100 Subject: [PATCH] Fix ActiveJob logging when callback chain is halted: - ### Problem ActiveJob will always log "Enqueued MyJob (Job ID) ..." even if the job doesn't get enqueued through the adapter. Same problem happens when performing a Job, "Performed MyJob (Job ID) ..." will be logged even when job wasn't performed at all. This situation can happen either if the callback chain is terminated (before_enqueue throwing an `abort`) or if an exception is raised. ### Solution Check if the callback chain is aborted/exception is raised, and log accordingly. --- activejob/CHANGELOG.md | 17 +++++++ activejob/lib/active_job/instrumentation.rb | 7 ++- activejob/lib/active_job/log_subscriber.rb | 40 +++++++++++++--- activejob/test/cases/logging_test.rb | 48 +++++++++++++++++++ .../test/jobs/abort_before_enqueue_job.rb | 13 ++++- 5 files changed, 117 insertions(+), 8 deletions(-) diff --git a/activejob/CHANGELOG.md b/activejob/CHANGELOG.md index d3583f6fad..1d3e89c184 100644 --- a/activejob/CHANGELOG.md +++ b/activejob/CHANGELOG.md @@ -1,3 +1,20 @@ +* Fix enqueuing and performing incorrect logging message. + + Jobs will no longer always log "Enqueued MyJob" or "Performed MyJob" when they actually didn't get enqueued/performed. + + ```ruby + class MyJob < ApplicationJob + before_enqueue { throw(:abort) } + end + + MyJob.perform_later # Will no longer log "Enqueud MyJob" since job wasn't even enqueued through adapter. + ``` + + A new message will be logged in case a job couldn't be enqueued, either because the callback chain was halted or + because an exception happened during enqueing. (i.e. Redis is down when you try to enqueue your job) + + *Edouard Chin* + * Add an option to disable logging of the job arguments when enqueuing and executing the job. class SensitiveJob < ApplicationJob diff --git a/activejob/lib/active_job/instrumentation.rb b/activejob/lib/active_job/instrumentation.rb index ed3eebd626..1534da3561 100644 --- a/activejob/lib/active_job/instrumentation.rb +++ b/activejob/lib/active_job/instrumentation.rb @@ -17,8 +17,13 @@ module Instrumentation #:nodoc: private def instrument(operation, payload = {}, &block) + enhanced_block = ->(event_payload) do + aborted = !block.call if block + event_payload[:aborted] = true if aborted + end + ActiveSupport::Notifications.instrument \ - "#{operation}.active_job", payload.merge(adapter: queue_adapter, job: self), &block + "#{operation}.active_job", payload.merge(adapter: queue_adapter, job: self), &enhanced_block end end end diff --git a/activejob/lib/active_job/log_subscriber.rb b/activejob/lib/active_job/log_subscriber.rb index d6cd90798d..0cf53db2eb 100644 --- a/activejob/lib/active_job/log_subscriber.rb +++ b/activejob/lib/active_job/log_subscriber.rb @@ -5,16 +5,40 @@ module ActiveJob class LogSubscriber < ActiveSupport::LogSubscriber #:nodoc: def enqueue(event) - info do - job = event.payload[:job] - "Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)}" + args_info(job) + job = event.payload[:job] + ex = event.payload[:exception_object] + + if ex + error do + "Failed enqueuing #{job.class.name} to #{queue_name(event)}: #{ex.class} (#{ex.message}):\n" + Array(ex.backtrace).join("\n") + end + elsif event.payload[:aborted] + info do + "Failed enqueuing #{job.class.name} to #{queue_name(event)}, a before_enqueue callback halted the enqueuing execution." + end + else + info do + "Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)}" + args_info(job) + end end end def enqueue_at(event) - info do - job = event.payload[:job] - "Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)} at #{scheduled_at(event)}" + args_info(job) + job = event.payload[:job] + ex = event.payload[:exception_object] + + if ex + error do + "Failed enqueuing #{job.class.name} to #{queue_name(event)}: #{ex.class} (#{ex.message}):\n" + Array(ex.backtrace).join("\n") + end + elsif event.payload[:aborted] + info do + "Failed enqueuing #{job.class.name} to #{queue_name(event)}, a before_enqueue callback halted the enqueuing execution." + end + else + info do + "Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)} at #{scheduled_at(event)}" + args_info(job) + end end end @@ -32,6 +56,10 @@ def perform(event) error do "Error performing #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} in #{event.duration.round(2)}ms: #{ex.class} (#{ex.message}):\n" + Array(ex.backtrace).join("\n") end + elsif event.payload[:aborted] + error do + "Error performing #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} in #{event.duration.round(2)}ms: a before_perform callback halted the job execution" + end else info do "Performed #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} in #{event.duration.round(2)}ms" diff --git a/activejob/test/cases/logging_test.rb b/activejob/test/cases/logging_test.rb index 2cd43c5b84..f21df62e2e 100644 --- a/activejob/test/cases/logging_test.rb +++ b/activejob/test/cases/logging_test.rb @@ -10,6 +10,7 @@ require "jobs/rescue_job" require "jobs/retry_job" require "jobs/disable_log_job" +require "jobs/abort_before_enqueue_job" require "models/person" class LoggingTest < ActiveSupport::TestCase @@ -112,6 +113,27 @@ def test_enqueue_job_logging assert_equal(key, "enqueue.active_job") end + def test_enqueue_job_log_error_when_callback_chain_is_halted + events = subscribed { AbortBeforeEnqueueJob.perform_later } + assert_match(/Failed enqueuing AbortBeforeEnqueueJob.* a before_enqueue callback halted/, @logger.messages) + assert_equal(events.count, 1) + key, * = events.first + assert_equal(key, "enqueue.active_job") + end + + def test_enqueue_job_log_error_when_error_is_raised_during_callback_chain + events = subscribed do + assert_raises(AbortBeforeEnqueueJob::MyError) do + AbortBeforeEnqueueJob.perform_later(:raise) + end + end + + assert_match(/Failed enqueuing AbortBeforeEnqueueJob/, @logger.messages) + assert_equal(events.count, 1) + key, * = events.first + assert_equal(key, "enqueue.active_job") + end + def test_perform_job_logging perform_enqueued_jobs do LoggingJob.perform_later "Dummy" @@ -123,6 +145,11 @@ def test_perform_job_logging end end + def test_perform_job_log_error_when_callback_chain_is_halted + subscribed { AbortBeforeEnqueueJob.perform_now } + assert_match(/Error performing AbortBeforeEnqueueJob.* a before_perform callback halted/, @logger.messages) + end + def test_perform_disabled_job_logging perform_enqueued_jobs do DisableLogJob.perform_later "Dummy" @@ -159,6 +186,27 @@ def test_enqueue_at_job_logging skip end + def test_enqueue_at_job_log_error_when_callback_chain_is_halted + events = subscribed { AbortBeforeEnqueueJob.set(wait: 1.second).perform_later } + assert_match(/Failed enqueuing AbortBeforeEnqueueJob.* a before_enqueue callback halted/, @logger.messages) + assert_equal(events.count, 1) + key, * = events.first + assert_equal(key, "enqueue_at.active_job") + end + + def test_enqueue_at_job_log_error_when_error_is_raised_during_callback_chain + events = subscribed do + assert_raises(AbortBeforeEnqueueJob::MyError) do + AbortBeforeEnqueueJob.set(wait: 1.second).perform_later(:raise) + end + end + + assert_match(/Failed enqueuing AbortBeforeEnqueueJob/, @logger.messages) + assert_equal(events.count, 1) + key, * = events.first + assert_equal(key, "enqueue_at.active_job") + end + def test_enqueue_in_job_logging events = subscribed { HelloJob.set(wait: 2.seconds).perform_later "Cristian" } assert_match(/Enqueued HelloJob \(Job ID: .*\) to .*? at.*Cristian/, @logger.messages) diff --git a/activejob/test/jobs/abort_before_enqueue_job.rb b/activejob/test/jobs/abort_before_enqueue_job.rb index fd278eccf4..8fb4d8078c 100644 --- a/activejob/test/jobs/abort_before_enqueue_job.rb +++ b/activejob/test/jobs/abort_before_enqueue_job.rb @@ -1,9 +1,20 @@ # frozen_string_literal: true class AbortBeforeEnqueueJob < ActiveJob::Base - before_enqueue { throw(:abort) } + MyError = Class.new(StandardError) + + before_enqueue :throw_or_raise + before_perform { throw(:abort) } def perform raise "This should never be called" end + + def throw_or_raise + if (arguments.first || :abort) == :abort + throw(:abort) + else + raise(MyError) + end + end end