Add callbacks, implement instrumentation as callbacks, and have the enqueue methods return a job instance

This commit is contained in:
David Heinemeier Hansson 2014-05-22 19:33:23 +02:00
parent f82bc7e551
commit c073ba339a
17 changed files with 204 additions and 54 deletions

@ -3,14 +3,17 @@
require 'active_job/enqueuing' require 'active_job/enqueuing'
require 'active_job/performing' require 'active_job/performing'
require 'active_job/logging' require 'active_job/logging'
require 'active_job/callbacks'
module ActiveJob module ActiveJob
class Base class Base
extend QueueAdapter extend QueueAdapter
extend QueueName extend QueueName
extend Enqueuing
include Enqueuing
include Performing include Performing
extend Logging include Callbacks
include Logging
ActiveSupport.run_load_hooks(:active_job, self) ActiveSupport.run_load_hooks(:active_job, self)
end end

@ -0,0 +1,40 @@
require 'active_support/callbacks'
module ActiveJob
module Callbacks
extend ActiveSupport::Concern
include ActiveSupport::Callbacks
included do
define_callbacks :perform
define_callbacks :enqueue
end
module ClassMethods
def before_perform(*filters, &blk)
set_callback(:perform, :before, *filters, &blk)
end
def after_perform(*filters, &blk)
set_callback(:perform, :after, *filters, &blk)
end
def around_perform(*filters, &blk)
set_callback(:perform, :around, *filters, &blk)
end
def before_enqueue(*filters, &blk)
set_callback(:enqueue, :before, *filters, &blk)
end
def after_enqueue(*filters, &blk)
set_callback(:enqueue, :after, *filters, &blk)
end
def around_enqueue(*filters, &blk)
set_callback(:enqueue, :around, *filters, &blk)
end
end
end
end

@ -2,42 +2,58 @@
module ActiveJob module ActiveJob
module Enqueuing module Enqueuing
# Push a job onto the queue. The arguments must be legal JSON types extend ActiveSupport::Concern
# (string, int, float, nil, true, false, hash or array) or
# ActiveModel::GlobalIdentication instances. Arbitrary Ruby objects module ClassMethods
# are not supported. # Push a job onto the queue. The arguments must be legal JSON types
# # (string, int, float, nil, true, false, hash or array) or
# The return value is adapter-specific and may change in a future # ActiveModel::GlobalIdentication instances. Arbitrary Ruby objects
# ActiveJob release. # are not supported.
def enqueue(*args) #
serialized_args = Parameters.serialize(args) # Returns an instance of the job class queued with args available in
instrument_enqueuing :enqueue, args: serialized_args # Job#arguments.
queue_adapter.enqueue self, *serialized_args def enqueue(*args)
end new(args).tap do |job|
job.run_callbacks :enqueue do
queue_adapter.enqueue self, *Parameters.serialize(args)
end
end
end
# Enqueue a job to be performed at +interval+ from now. # Enqueue a job to be performed at +interval+ from now.
# #
# enqueue_in(1.week, "mike") # enqueue_in(1.week, "mike")
# #
# Returns truthy if a job was scheduled. # Returns an instance of the job class queued with args available in
def enqueue_in(interval, *args) # Job#arguments and the timestamp in Job#enqueue_at.
enqueue_at(interval.seconds.from_now, *args) def enqueue_in(interval, *args)
end enqueue_at interval.seconds.from_now, *args
end
# Enqueue a job to be performed at an explicit point in time. # Enqueue a job to be performed at an explicit point in time.
# #
# enqueue_at(Date.tomorrow.midnight, "mike") # enqueue_at(Date.tomorrow.midnight, "mike")
# #
# Returns truthy if a job was scheduled. # Returns an instance of the job class queued with args available in
def enqueue_at(timestamp, *args) # Job#arguments and the timestamp in Job#enqueue_at.
serialized_args = Parameters.serialize(args) def enqueue_at(timestamp, *args)
instrument_enqueuing :enqueue_at, args: serialized_args, timestamp: timestamp new(args).tap do |job|
queue_adapter.enqueue_at self, timestamp.to_f, *serialized_args job.enqueued_at = timestamp
job.run_callbacks :enqueue do
queue_adapter.enqueue_at self, timestamp.to_f, *Parameters.serialize(args)
end
end
end
end end
private included do
def instrument_enqueuing(method_name, options = {}) attr_accessor :arguments
ActiveSupport::Notifications.instrument "#{method_name}.active_job", options.merge(adapter: queue_adapter, job: self) attr_accessor :enqueued_at
end end
def initialize(arguments = nil)
@arguments = arguments
end
end end
end end

@ -2,7 +2,29 @@
module ActiveJob module ActiveJob
module Logging module Logging
mattr_accessor(:logger) { ActiveSupport::Logger.new(STDOUT) } extend ActiveSupport::Concern
module ClassMethods
mattr_accessor(:logger) { ActiveSupport::Logger.new(STDOUT) }
end
included do
before_enqueue do |job|
if job.enqueued_at
ActiveSupport::Notifications.instrument "enqueue_at.active_job",
adapter: job.class.queue_adapter, job: job.class, args: job.arguments, timestamp: job.enqueued_at
else
ActiveSupport::Notifications.instrument "enqueue.active_job",
adapter: job.class.queue_adapter, job: job.class, args: job.arguments
end
end
before_perform do |job|
ActiveSupport::Notifications.instrument "perform.active_job",
adapter: job.class.queue_adapter, job: job.class, args: job.arguments
end
end
class LogSubscriber < ActiveSupport::LogSubscriber class LogSubscriber < ActiveSupport::LogSubscriber
def enqueue(event) def enqueue(event)
@ -17,6 +39,7 @@ def perform(event)
info "Performed #{event.payload[:job].name} from #{queue_name(event)}" + args_info(event) info "Performed #{event.payload[:job].name} from #{queue_name(event)}" + args_info(event)
end end
private private
def queue_name(event) def queue_name(event)
event.payload[:adapter].name.demodulize.remove('Adapter') event.payload[:adapter].name.demodulize.remove('Adapter')

@ -2,18 +2,16 @@
module ActiveJob module ActiveJob
module Performing module Performing
def perform_with_deserialization(*serialized_args) def perform_with_hooks(*serialized_args)
instrument_performing serialized_args self.arguments = Parameters.deserialize(serialized_args)
perform *Parameters.deserialize(serialized_args)
run_callbacks :perform do
perform *arguments
end
end end
def perform(*) def perform(*)
raise NotImplementedError raise NotImplementedError
end end
private
def instrument_performing(args)
ActiveSupport::Notifications.instrument "perform.active_job", adapter: self.class.queue_adapter, job: self.class, args: args
end
end end
end end

@ -16,7 +16,7 @@ def enqueue_at(job, timestamp, *args)
class JobWrapper class JobWrapper
class << self class << self
def perform(job_name, *args) def perform(job_name, *args)
job_name.constantize.new.perform_with_deserialization *args job_name.constantize.new.perform_with_hooks *args
end end
end end
end end

@ -15,7 +15,7 @@ def enqueue_at(job, timestamp, *args)
class JobWrapper class JobWrapper
def perform(job, *args) def perform(job, *args)
job.new.perform_with_deserialization *args job.new.perform_with_hooks *args
end end
end end
end end

@ -3,7 +3,7 @@ module QueueAdapters
class InlineAdapter class InlineAdapter
class << self class << self
def enqueue(job, *args) def enqueue(job, *args)
job.new.perform_with_deserialization *args job.new.perform_with_hooks *args
end end
def enqueue_at(job, timestamp, *args) def enqueue_at(job, timestamp, *args)
@ -11,7 +11,7 @@ def enqueue_at(job, timestamp, *args)
begin begin
interval = Time.now.to_f - timestamp interval = Time.now.to_f - timestamp
sleep(interval) if interval > 0 sleep(interval) if interval > 0
job.new.perform_with_deserialization *args job.new.perform_with_hooks *args
rescue => e rescue => e
ActiveJob::Base.logger.info "Error performing #{job}: #{e.message}" ActiveJob::Base.logger.info "Error performing #{job}: #{e.message}"
end end

@ -15,7 +15,7 @@ def enqueue_at(job, timestamp, *args)
class JobWrapper < Que::Job class JobWrapper < Que::Job
def run(job, *args) def run(job, *args)
job.new.perform_with_deserialization *args job.new.perform_with_hooks *args
end end
end end
end end

@ -15,7 +15,7 @@ def enqueue_at(job, timestamp, *args)
class JobWrapper class JobWrapper
def self.perform(job, *args) def self.perform(job, *args)
job.new.perform_with_deserialization *args job.new.perform_with_hooks *args
end end
end end
end end

@ -19,7 +19,7 @@ def enqueue_at(job, timestamp, *args)
class JobWrapper class JobWrapper
class << self class << self
def perform(job_name, *args) def perform(job_name, *args)
job_name.constantize.new.perform_with_deserialization *args job_name.constantize.new.perform_with_hooks *args
end end
end end

@ -26,7 +26,7 @@ class JobWrapper
include Sidekiq::Worker include Sidekiq::Worker
def perform(job_name, *args) def perform(job_name, *args)
job_name.constantize.new.perform_with_deserialization *args job_name.constantize.new.perform_with_hooks *args
end end
end end
end end

@ -23,7 +23,7 @@ class JobWrapper
include Sneakers::Worker include Sneakers::Worker
def work(job, *args) def work(job, *args)
job.new.perform_with_deserialization *args job.new.perform_with_hooks *args
end end
end end
end end

@ -17,7 +17,7 @@ class JobWrapper
include SuckerPunch::Job include SuckerPunch::Job
def perform(job, *args) def perform(job, *args)
job.new.perform_with_deserialization *args job.new.perform_with_hooks *args
end end
end end
end end

@ -0,0 +1,23 @@
require 'helper'
require 'active_job/parameters'
require 'jobs/callback_job'
require 'active_support/core_ext/object/inclusion'
class CallbacksTest < ActiveSupport::TestCase
test 'perform callbacks' do
performed_callback_job = CallbackJob.new.tap { |j| j.perform_with_hooks }
assert "CallbackJob ran before_perform".in? performed_callback_job.history
assert "CallbackJob ran after_perform".in? performed_callback_job.history
assert "CallbackJob ran around_perform_start".in? performed_callback_job.history
assert "CallbackJob ran around_perform_stop".in? performed_callback_job.history
end
test 'enqueue callbacks' do
enqueued_callback_job = CallbackJob.enqueue
assert "CallbackJob ran before_enqueue".in? enqueued_callback_job.history
assert "CallbackJob ran after_enqueue".in? enqueued_callback_job.history
assert "CallbackJob ran around_enqueue_start".in? enqueued_callback_job.history
assert "CallbackJob ran around_enqueue_stop".in? enqueued_callback_job.history
end
end

@ -26,4 +26,19 @@ class QueuingTest < ActiveSupport::TestCase
skip skip
end end
end end
test 'job returned by enqueue has the arguments available' do
job = HelloJob.enqueue "Jamie"
assert_equal [ "Jamie" ], job.arguments
end
test 'job returned by enqueue_at has the timestamp available' do
begin
job = HelloJob.enqueue_at Time.utc(2014, 1, 1)
assert_equal Time.utc(2014, 1, 1), job.enqueued_at
rescue NotImplementedError
skip
end
end
end end

32
test/jobs/callback_job.rb Normal file

@ -0,0 +1,32 @@
class CallbackJob < ActiveJob::Base
before_perform ->(job) { job.history << "CallbackJob ran before_perform" }
after_perform ->(job) { job.history << "CallbackJob ran after_perform" }
before_enqueue ->(job) { job.history << "CallbackJob ran before_enqueue" }
after_enqueue ->(job) { job.history << "CallbackJob ran after_enqueue" }
around_perform :around_perform
around_enqueue :around_enqueue
def perform(person = "david")
# NOTHING!
end
def history
@history ||= []
end
# FIXME: Not sure why these can't be declared inline like before/after
def around_perform
history << "CallbackJob ran around_perform_start"
yield
history << "CallbackJob ran around_perform_stop"
end
def around_enqueue
history << "CallbackJob ran around_enqueue_start"
yield
history << "CallbackJob ran around_enqueue_stop"
end
end