From c073ba339a6820625718f7320989cfa527534563 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Thu, 22 May 2014 19:33:23 +0200 Subject: [PATCH] Add callbacks, implement instrumentation as callbacks, and have the enqueue methods return a job instance --- lib/active_job/base.rb | 7 +- lib/active_job/callbacks.rb | 40 +++++++++ lib/active_job/enqueuing.rb | 82 +++++++++++-------- lib/active_job/logging.rb | 25 +++++- lib/active_job/performing.rb | 14 ++-- .../queue_adapters/backburner_adapter.rb | 2 +- .../queue_adapters/delayed_job_adapter.rb | 2 +- .../queue_adapters/inline_adapter.rb | 4 +- lib/active_job/queue_adapters/que_adapter.rb | 2 +- .../queue_adapters/queue_classic_adapter.rb | 2 +- .../queue_adapters/resque_adapter.rb | 2 +- .../queue_adapters/sidekiq_adapter.rb | 2 +- .../queue_adapters/sneakers_adapter.rb | 2 +- .../queue_adapters/sucker_punch_adapter.rb | 2 +- test/cases/callbacks_test.rb | 23 ++++++ test/cases/queuing_test.rb | 15 ++++ test/jobs/callback_job.rb | 32 ++++++++ 17 files changed, 204 insertions(+), 54 deletions(-) create mode 100644 lib/active_job/callbacks.rb create mode 100644 test/cases/callbacks_test.rb create mode 100644 test/jobs/callback_job.rb diff --git a/lib/active_job/base.rb b/lib/active_job/base.rb index e6b02708a1..1b88bc5bcc 100644 --- a/lib/active_job/base.rb +++ b/lib/active_job/base.rb @@ -3,14 +3,17 @@ require 'active_job/enqueuing' require 'active_job/performing' require 'active_job/logging' +require 'active_job/callbacks' module ActiveJob class Base extend QueueAdapter extend QueueName - extend Enqueuing + + include Enqueuing include Performing - extend Logging + include Callbacks + include Logging ActiveSupport.run_load_hooks(:active_job, self) end diff --git a/lib/active_job/callbacks.rb b/lib/active_job/callbacks.rb new file mode 100644 index 0000000000..c69e4a3b55 --- /dev/null +++ b/lib/active_job/callbacks.rb @@ -0,0 +1,40 @@ +require 'active_support/callbacks' + +module ActiveJob + module Callbacks + extend ActiveSupport::Concern + include ActiveSupport::Callbacks + + included do + define_callbacks :perform + define_callbacks :enqueue + end + + module ClassMethods + def before_perform(*filters, &blk) + set_callback(:perform, :before, *filters, &blk) + end + + def after_perform(*filters, &blk) + set_callback(:perform, :after, *filters, &blk) + end + + def around_perform(*filters, &blk) + set_callback(:perform, :around, *filters, &blk) + end + + + def before_enqueue(*filters, &blk) + set_callback(:enqueue, :before, *filters, &blk) + end + + def after_enqueue(*filters, &blk) + set_callback(:enqueue, :after, *filters, &blk) + end + + def around_enqueue(*filters, &blk) + set_callback(:enqueue, :around, *filters, &blk) + end + end + end +end \ No newline at end of file diff --git a/lib/active_job/enqueuing.rb b/lib/active_job/enqueuing.rb index e8f3272782..8b1f29ce77 100644 --- a/lib/active_job/enqueuing.rb +++ b/lib/active_job/enqueuing.rb @@ -2,42 +2,58 @@ module ActiveJob module Enqueuing - # Push a job onto the queue. The arguments must be legal JSON types - # (string, int, float, nil, true, false, hash or array) or - # ActiveModel::GlobalIdentication instances. Arbitrary Ruby objects - # are not supported. - # - # The return value is adapter-specific and may change in a future - # ActiveJob release. - def enqueue(*args) - serialized_args = Parameters.serialize(args) - instrument_enqueuing :enqueue, args: serialized_args - queue_adapter.enqueue self, *serialized_args - end + extend ActiveSupport::Concern + + module ClassMethods + # Push a job onto the queue. The arguments must be legal JSON types + # (string, int, float, nil, true, false, hash or array) or + # ActiveModel::GlobalIdentication instances. Arbitrary Ruby objects + # are not supported. + # + # Returns an instance of the job class queued with args available in + # Job#arguments. + def enqueue(*args) + new(args).tap do |job| + job.run_callbacks :enqueue do + queue_adapter.enqueue self, *Parameters.serialize(args) + end + end + end - # Enqueue a job to be performed at +interval+ from now. - # - # enqueue_in(1.week, "mike") - # - # Returns truthy if a job was scheduled. - def enqueue_in(interval, *args) - enqueue_at(interval.seconds.from_now, *args) - end + # Enqueue a job to be performed at +interval+ from now. + # + # enqueue_in(1.week, "mike") + # + # Returns an instance of the job class queued with args available in + # Job#arguments and the timestamp in Job#enqueue_at. + def enqueue_in(interval, *args) + enqueue_at interval.seconds.from_now, *args + end - # Enqueue a job to be performed at an explicit point in time. - # - # enqueue_at(Date.tomorrow.midnight, "mike") - # - # Returns truthy if a job was scheduled. - def enqueue_at(timestamp, *args) - serialized_args = Parameters.serialize(args) - instrument_enqueuing :enqueue_at, args: serialized_args, timestamp: timestamp - queue_adapter.enqueue_at self, timestamp.to_f, *serialized_args + # Enqueue a job to be performed at an explicit point in time. + # + # enqueue_at(Date.tomorrow.midnight, "mike") + # + # Returns an instance of the job class queued with args available in + # Job#arguments and the timestamp in Job#enqueue_at. + def enqueue_at(timestamp, *args) + new(args).tap do |job| + job.enqueued_at = timestamp + + job.run_callbacks :enqueue do + queue_adapter.enqueue_at self, timestamp.to_f, *Parameters.serialize(args) + end + end + end end - private - def instrument_enqueuing(method_name, options = {}) - ActiveSupport::Notifications.instrument "#{method_name}.active_job", options.merge(adapter: queue_adapter, job: self) - end + included do + attr_accessor :arguments + attr_accessor :enqueued_at + end + + def initialize(arguments = nil) + @arguments = arguments + end end end diff --git a/lib/active_job/logging.rb b/lib/active_job/logging.rb index f3cc599a14..f4a33ffe19 100644 --- a/lib/active_job/logging.rb +++ b/lib/active_job/logging.rb @@ -2,7 +2,29 @@ module ActiveJob module Logging - mattr_accessor(:logger) { ActiveSupport::Logger.new(STDOUT) } + extend ActiveSupport::Concern + + module ClassMethods + mattr_accessor(:logger) { ActiveSupport::Logger.new(STDOUT) } + end + + included do + before_enqueue do |job| + if job.enqueued_at + ActiveSupport::Notifications.instrument "enqueue_at.active_job", + adapter: job.class.queue_adapter, job: job.class, args: job.arguments, timestamp: job.enqueued_at + else + ActiveSupport::Notifications.instrument "enqueue.active_job", + adapter: job.class.queue_adapter, job: job.class, args: job.arguments + end + end + + before_perform do |job| + ActiveSupport::Notifications.instrument "perform.active_job", + adapter: job.class.queue_adapter, job: job.class, args: job.arguments + end + end + class LogSubscriber < ActiveSupport::LogSubscriber def enqueue(event) @@ -17,6 +39,7 @@ def perform(event) info "Performed #{event.payload[:job].name} from #{queue_name(event)}" + args_info(event) end + private def queue_name(event) event.payload[:adapter].name.demodulize.remove('Adapter') diff --git a/lib/active_job/performing.rb b/lib/active_job/performing.rb index eca311578d..126193995c 100644 --- a/lib/active_job/performing.rb +++ b/lib/active_job/performing.rb @@ -2,18 +2,16 @@ module ActiveJob module Performing - def perform_with_deserialization(*serialized_args) - instrument_performing serialized_args - perform *Parameters.deserialize(serialized_args) + def perform_with_hooks(*serialized_args) + self.arguments = Parameters.deserialize(serialized_args) + + run_callbacks :perform do + perform *arguments + end end def perform(*) raise NotImplementedError end - - private - def instrument_performing(args) - ActiveSupport::Notifications.instrument "perform.active_job", adapter: self.class.queue_adapter, job: self.class, args: args - end end end diff --git a/lib/active_job/queue_adapters/backburner_adapter.rb b/lib/active_job/queue_adapters/backburner_adapter.rb index 5230acc625..b7e963cd6f 100644 --- a/lib/active_job/queue_adapters/backburner_adapter.rb +++ b/lib/active_job/queue_adapters/backburner_adapter.rb @@ -16,7 +16,7 @@ def enqueue_at(job, timestamp, *args) class JobWrapper class << self def perform(job_name, *args) - job_name.constantize.new.perform_with_deserialization *args + job_name.constantize.new.perform_with_hooks *args end end end diff --git a/lib/active_job/queue_adapters/delayed_job_adapter.rb b/lib/active_job/queue_adapters/delayed_job_adapter.rb index 5a9c4c708d..fa08d779fb 100644 --- a/lib/active_job/queue_adapters/delayed_job_adapter.rb +++ b/lib/active_job/queue_adapters/delayed_job_adapter.rb @@ -15,7 +15,7 @@ def enqueue_at(job, timestamp, *args) class JobWrapper def perform(job, *args) - job.new.perform_with_deserialization *args + job.new.perform_with_hooks *args end end end diff --git a/lib/active_job/queue_adapters/inline_adapter.rb b/lib/active_job/queue_adapters/inline_adapter.rb index d826ce51b4..8b82d7c25a 100644 --- a/lib/active_job/queue_adapters/inline_adapter.rb +++ b/lib/active_job/queue_adapters/inline_adapter.rb @@ -3,7 +3,7 @@ module QueueAdapters class InlineAdapter class << self def enqueue(job, *args) - job.new.perform_with_deserialization *args + job.new.perform_with_hooks *args end def enqueue_at(job, timestamp, *args) @@ -11,7 +11,7 @@ def enqueue_at(job, timestamp, *args) begin interval = Time.now.to_f - timestamp sleep(interval) if interval > 0 - job.new.perform_with_deserialization *args + job.new.perform_with_hooks *args rescue => e ActiveJob::Base.logger.info "Error performing #{job}: #{e.message}" end diff --git a/lib/active_job/queue_adapters/que_adapter.rb b/lib/active_job/queue_adapters/que_adapter.rb index 9dd57d65f3..adb9125666 100644 --- a/lib/active_job/queue_adapters/que_adapter.rb +++ b/lib/active_job/queue_adapters/que_adapter.rb @@ -15,7 +15,7 @@ def enqueue_at(job, timestamp, *args) class JobWrapper < Que::Job def run(job, *args) - job.new.perform_with_deserialization *args + job.new.perform_with_hooks *args end end end diff --git a/lib/active_job/queue_adapters/queue_classic_adapter.rb b/lib/active_job/queue_adapters/queue_classic_adapter.rb index eacc6b5548..01d6d30caf 100644 --- a/lib/active_job/queue_adapters/queue_classic_adapter.rb +++ b/lib/active_job/queue_adapters/queue_classic_adapter.rb @@ -15,7 +15,7 @@ def enqueue_at(job, timestamp, *args) class JobWrapper def self.perform(job, *args) - job.new.perform_with_deserialization *args + job.new.perform_with_hooks *args end end end diff --git a/lib/active_job/queue_adapters/resque_adapter.rb b/lib/active_job/queue_adapters/resque_adapter.rb index 3b87f25b80..99da3c63ce 100644 --- a/lib/active_job/queue_adapters/resque_adapter.rb +++ b/lib/active_job/queue_adapters/resque_adapter.rb @@ -19,7 +19,7 @@ def enqueue_at(job, timestamp, *args) class JobWrapper class << self def perform(job_name, *args) - job_name.constantize.new.perform_with_deserialization *args + job_name.constantize.new.perform_with_hooks *args end end diff --git a/lib/active_job/queue_adapters/sidekiq_adapter.rb b/lib/active_job/queue_adapters/sidekiq_adapter.rb index 74fbe632d6..2a2c2ce442 100644 --- a/lib/active_job/queue_adapters/sidekiq_adapter.rb +++ b/lib/active_job/queue_adapters/sidekiq_adapter.rb @@ -26,7 +26,7 @@ class JobWrapper include Sidekiq::Worker def perform(job_name, *args) - job_name.constantize.new.perform_with_deserialization *args + job_name.constantize.new.perform_with_hooks *args end end end diff --git a/lib/active_job/queue_adapters/sneakers_adapter.rb b/lib/active_job/queue_adapters/sneakers_adapter.rb index 6bb575e907..ebd794fca1 100644 --- a/lib/active_job/queue_adapters/sneakers_adapter.rb +++ b/lib/active_job/queue_adapters/sneakers_adapter.rb @@ -23,7 +23,7 @@ class JobWrapper include Sneakers::Worker def work(job, *args) - job.new.perform_with_deserialization *args + job.new.perform_with_hooks *args end end end diff --git a/lib/active_job/queue_adapters/sucker_punch_adapter.rb b/lib/active_job/queue_adapters/sucker_punch_adapter.rb index 30718fc05f..a166081f9f 100644 --- a/lib/active_job/queue_adapters/sucker_punch_adapter.rb +++ b/lib/active_job/queue_adapters/sucker_punch_adapter.rb @@ -17,7 +17,7 @@ class JobWrapper include SuckerPunch::Job def perform(job, *args) - job.new.perform_with_deserialization *args + job.new.perform_with_hooks *args end end end diff --git a/test/cases/callbacks_test.rb b/test/cases/callbacks_test.rb new file mode 100644 index 0000000000..391c7e87c4 --- /dev/null +++ b/test/cases/callbacks_test.rb @@ -0,0 +1,23 @@ +require 'helper' +require 'active_job/parameters' +require 'jobs/callback_job' + +require 'active_support/core_ext/object/inclusion' + +class CallbacksTest < ActiveSupport::TestCase + test 'perform callbacks' do + performed_callback_job = CallbackJob.new.tap { |j| j.perform_with_hooks } + assert "CallbackJob ran before_perform".in? performed_callback_job.history + assert "CallbackJob ran after_perform".in? performed_callback_job.history + assert "CallbackJob ran around_perform_start".in? performed_callback_job.history + assert "CallbackJob ran around_perform_stop".in? performed_callback_job.history + end + + test 'enqueue callbacks' do + enqueued_callback_job = CallbackJob.enqueue + assert "CallbackJob ran before_enqueue".in? enqueued_callback_job.history + assert "CallbackJob ran after_enqueue".in? enqueued_callback_job.history + assert "CallbackJob ran around_enqueue_start".in? enqueued_callback_job.history + assert "CallbackJob ran around_enqueue_stop".in? enqueued_callback_job.history + end +end diff --git a/test/cases/queuing_test.rb b/test/cases/queuing_test.rb index 23df35a8df..029f60f246 100644 --- a/test/cases/queuing_test.rb +++ b/test/cases/queuing_test.rb @@ -26,4 +26,19 @@ class QueuingTest < ActiveSupport::TestCase skip end end + + test 'job returned by enqueue has the arguments available' do + job = HelloJob.enqueue "Jamie" + assert_equal [ "Jamie" ], job.arguments + end + + + test 'job returned by enqueue_at has the timestamp available' do + begin + job = HelloJob.enqueue_at Time.utc(2014, 1, 1) + assert_equal Time.utc(2014, 1, 1), job.enqueued_at + rescue NotImplementedError + skip + end + end end diff --git a/test/jobs/callback_job.rb b/test/jobs/callback_job.rb new file mode 100644 index 0000000000..056dd073e8 --- /dev/null +++ b/test/jobs/callback_job.rb @@ -0,0 +1,32 @@ +class CallbackJob < ActiveJob::Base + before_perform ->(job) { job.history << "CallbackJob ran before_perform" } + after_perform ->(job) { job.history << "CallbackJob ran after_perform" } + + before_enqueue ->(job) { job.history << "CallbackJob ran before_enqueue" } + after_enqueue ->(job) { job.history << "CallbackJob ran after_enqueue" } + + around_perform :around_perform + around_enqueue :around_enqueue + + + def perform(person = "david") + # NOTHING! + end + + def history + @history ||= [] + end + + # FIXME: Not sure why these can't be declared inline like before/after + def around_perform + history << "CallbackJob ran around_perform_start" + yield + history << "CallbackJob ran around_perform_stop" + end + + def around_enqueue + history << "CallbackJob ran around_enqueue_start" + yield + history << "CallbackJob ran around_enqueue_stop" + end +end