diff --git a/activejob/lib/active_job/enqueuing.rb b/activejob/lib/active_job/enqueuing.rb index 9f011108e2..357dcd71d0 100644 --- a/activejob/lib/active_job/enqueuing.rb +++ b/activejob/lib/active_job/enqueuing.rb @@ -31,6 +31,7 @@ def perform_all_later(*jobs) rescue EnqueueError => e job.enqueue_error = e end + adapter_jobs.count(&:successfully_enqueued?) end end end diff --git a/activejob/lib/active_job/log_subscriber.rb b/activejob/lib/active_job/log_subscriber.rb index c8a7b6f819..40e6bdb3d0 100644 --- a/activejob/lib/active_job/log_subscriber.rb +++ b/activejob/lib/active_job/log_subscriber.rb @@ -47,6 +47,32 @@ def enqueue_at(event) end subscribe_log_level :enqueue_at, :info + def enqueue_all(event) + info do + jobs = event.payload[:jobs] + adapter = event.payload[:adapter] + enqueued_count = event.payload[:enqueued_count] + + if enqueued_count == jobs.size + enqueued_jobs_message(adapter, jobs) + elsif jobs.any?(&:successfully_enqueued?) + enqueued_jobs = jobs.select(&:successfully_enqueued?) + + failed_enqueue_count = jobs.size - enqueued_count + if failed_enqueue_count == 0 + enqueued_jobs_message(adapter, enqueued_jobs) + else + "#{enqueued_jobs_message(adapter, enqueued_jobs)}. "\ + "Failed enqueuing #{failed_enqueue_count} #{'job'.pluralize(failed_enqueue_count)}" + end + else + failed_enqueue_count = jobs.size - enqueued_count + "Failed enqueuing #{failed_enqueue_count} #{'job'.pluralize(failed_enqueue_count)} to #{adapter_name(adapter)}" + end + end + end + subscribe_log_level :enqueue_all, :info + def perform_start(event) info do job = event.payload[:job] @@ -111,7 +137,11 @@ def discard(event) private def queue_name(event) - event.payload[:adapter].class.name.demodulize.remove("Adapter") + "(#{event.payload[:job].queue_name})" + adapter_name(event.payload[:adapter]) + "(#{event.payload[:job].queue_name})" + end + + def adapter_name(adapter) + adapter.class.name.demodulize.delete_suffix("Adapter") end def args_info(job) @@ -171,6 +201,13 @@ def log_enqueue_source def extract_enqueue_source_location(locations) backtrace_cleaner.clean(locations.lazy).first end + + def enqueued_jobs_message(adapter, enqueued_jobs) + enqueued_count = enqueued_jobs.size + job_classes_counts = enqueued_jobs.map(&:class).tally.sort_by { |_k, v| -v } + "Enqueued #{enqueued_count} #{'job'.pluralize(enqueued_count)} to #{adapter_name(adapter)}"\ + " (#{job_classes_counts.map { |klass, count| "#{count} #{klass}" }.join(', ')})" + end end end diff --git a/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb b/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb index a089377995..3c73b2e529 100644 --- a/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb @@ -33,30 +33,34 @@ def enqueue_at(job, timestamp) # :nodoc: end def enqueue_all(jobs) # :nodoc: + enqueued_count = 0 jobs.group_by(&:class).each do |job_class, same_class_jobs| same_class_jobs.group_by(&:queue_name).each do |queue, same_class_and_queue_jobs| immediate_jobs, scheduled_jobs = same_class_and_queue_jobs.partition { |job| job.scheduled_at.nil? } if immediate_jobs.any? - Sidekiq::Client.push_bulk( + jids = Sidekiq::Client.push_bulk( "class" => JobWrapper, "wrapped" => job_class, "queue" => queue, "args" => immediate_jobs.map { |job| [job.serialize] }, ) + enqueued_count += jids.compact.size end if scheduled_jobs.any? - Sidekiq::Client.push_bulk( + jids = Sidekiq::Client.push_bulk( "class" => JobWrapper, "wrapped" => job_class, "queue" => queue, "args" => scheduled_jobs.map { |job| [job.serialize] }, "at" => scheduled_jobs.map { |job| job.scheduled_at } ) + enqueued_count += jids.compact.size end end end + enqueued_count end class JobWrapper # :nodoc: diff --git a/activejob/test/cases/logging_test.rb b/activejob/test/cases/logging_test.rb index c7c16c3cbb..c30c609282 100644 --- a/activejob/test/cases/logging_test.rb +++ b/activejob/test/cases/logging_test.rb @@ -11,6 +11,7 @@ require "jobs/retry_job" require "jobs/disable_log_job" require "jobs/abort_before_enqueue_job" +require "jobs/enqueue_error_job" require "models/person" class LoggingTest < ActiveSupport::TestCase @@ -298,6 +299,28 @@ def test_discard_logging end end + def test_enqueue_all_job_logging_some_jobs_failed_enqueuing + EnqueueErrorJob.disable_test_adapter + + EnqueueErrorJob::EnqueueErrorAdapter.should_raise_sequence = [false, true] + + ActiveJob.perform_all_later(EnqueueErrorJob.new, EnqueueErrorJob.new) + assert_match(/Enqueued 1 job to .+ \(1 EnqueueErrorJob\)\. Failed enqueuing 1 job/, @logger.messages) + ensure + EnqueueErrorJob::EnqueueErrorAdapter.should_raise_sequence = [] + end + + def test_enqueue_all_job_logging_all_jobs_failed_enqueuing + EnqueueErrorJob.disable_test_adapter + + EnqueueErrorJob::EnqueueErrorAdapter.should_raise_sequence = [true, true] + + ActiveJob.perform_all_later(EnqueueErrorJob.new, EnqueueErrorJob.new) + assert_match(/Failed enqueuing 2 jobs to .+/, @logger.messages) + ensure + EnqueueErrorJob::EnqueueErrorAdapter.should_raise_sequence = [] + end + def test_verbose_enqueue_logs ActiveJob.verbose_enqueue_logs = true @@ -311,4 +334,9 @@ def test_verbose_enqueue_logs_disabled_by_default LoggingJob.perform_later "Dummy" assert_no_match("↳", @logger.messages) end + + def test_enqueue_all_job_logging + ActiveJob.perform_all_later(LoggingJob.new("Dummy"), HelloJob.new("Jamie"), HelloJob.new("John")) + assert_match(/Enqueued 3 jobs to .+ \(2 HelloJob, 1 LoggingJob\)/, @logger.messages) + end end diff --git a/activejob/test/cases/queuing_test.rb b/activejob/test/cases/queuing_test.rb index 5e7f2cbbe7..675496cbcb 100644 --- a/activejob/test/cases/queuing_test.rb +++ b/activejob/test/cases/queuing_test.rb @@ -70,4 +70,24 @@ class QueuingTest < ActiveSupport::TestCase ActiveJob.perform_all_later([HelloJob.new("Jamie"), MultipleKwargsJob.new(argument1: "John", argument2: 42)]) assert_equal ["Jamie says hello", "Job with argument1: John, argument2: 42"], JobBuffer.values.sort end + + test "perform_all_later instrumentation" do + jobs = HelloJob.new("Jamie"), HelloJob.new("John") + called = false + + subscriber = lambda do |*args| + called = true + event = ActiveSupport::Notifications::Event.new(*args) + payload = event.payload + assert payload[:adapter] + assert_equal jobs, payload[:jobs] + assert_equal 2, payload[:enqueued_count] + end + + ActiveSupport::Notifications.subscribed(subscriber, "enqueue_all.active_job") do + ActiveJob.perform_all_later(jobs) + end + + assert called + end end diff --git a/activejob/test/jobs/enqueue_error_job.rb b/activejob/test/jobs/enqueue_error_job.rb index 7d61404263..f6be4b0165 100644 --- a/activejob/test/jobs/enqueue_error_job.rb +++ b/activejob/test/jobs/enqueue_error_job.rb @@ -3,17 +3,25 @@ class EnqueueErrorJob < ActiveJob::Base class EnqueueErrorAdapter class << self - def enqueue(*) - raise ActiveJob::EnqueueError, "There was an error enqueuing the job" - end - - def enqueue_at(*) - raise ActiveJob::EnqueueError, "There was an error enqueuing the job" - end + attr_accessor :should_raise_sequence end + self.should_raise_sequence = [] + + def enqueue(*) + raise ActiveJob::EnqueueError, "There was an error enqueuing the job" if should_raise? + end + + def enqueue_at(*) + raise ActiveJob::EnqueueError, "There was an error enqueuing the job" if should_raise? + end + + private + def should_raise? + self.class.should_raise_sequence.empty? || self.class.should_raise_sequence.shift + end end - self.queue_adapter = EnqueueErrorAdapter + self.queue_adapter = EnqueueErrorAdapter.new def perform raise "This should never be called"