ActiveJob::QueueAdapters::* are no longer singletons

This commit is contained in:
Tamir Duberstein 2015-03-11 14:57:13 -07:00
parent c696cffcf6
commit fb26645c1e
18 changed files with 181 additions and 160 deletions

@ -81,7 +81,7 @@ def perform(event)
private
def queue_name(event)
event.payload[:adapter].name.demodulize.remove('Adapter') + "(#{event.payload[:job].queue_name})"
event.payload[:adapter].class.name.demodulize.remove('Adapter') + "(#{event.payload[:job].queue_name})"
end
def args_info(job)

@ -14,20 +14,40 @@ module ClassMethods
# Specify the backend queue provider. The default queue adapter
# is the :inline queue. See QueueAdapters for more
# information.
def queue_adapter=(name_or_adapter)
@@queue_adapter = \
case name_or_adapter
when Symbol, String
load_adapter(name_or_adapter)
def queue_adapter=(name_or_adapter_or_class)
case name_or_adapter_or_class
when Symbol, String
self.queue_adapter = load_adapter(name_or_adapter_or_class)
else
@@queue_adapter = if queue_adapter?(name_or_adapter_or_class)
name_or_adapter_or_class
elsif queue_adapter_class?(name_or_adapter_or_class)
ActiveSupport::Deprecation.warn "Passing an adapter class is deprecated " \
"and will be removed in Rails 5.1. Please pass an adapter name " \
"(.queue_adapter = :#{name_or_adapter_or_class.name.demodulize.remove('Adapter').underscore}) " \
"or an instance (.queue_adapter = #{name_or_adapter_or_class.name}.new) instead."
name_or_adapter_or_class.new
else
name_or_adapter if name_or_adapter.respond_to?(:enqueue)
raise ArgumentError
end
end
end
private
def load_adapter(name)
"ActiveJob::QueueAdapters::#{name.to_s.camelize}Adapter".constantize
end
QUEUE_ADAPTER_METHODS = [:enqueue, :enqueue_at].freeze
def queue_adapter?(object)
QUEUE_ADAPTER_METHODS.all? { |meth| object.respond_to?(meth) }
end
def queue_adapter_class?(object)
object.is_a?(Class) && QUEUE_ADAPTER_METHODS.all? { |meth| object.public_method_defined?(meth) }
end
def load_adapter(name)
"ActiveJob::QueueAdapters::#{name.to_s.camelize}Adapter".constantize.new
end
end
end
end

@ -13,15 +13,13 @@ module QueueAdapters
#
# Rails.application.config.active_job.queue_adapter = :backburner
class BackburnerAdapter
class << self
def enqueue(job) #:nodoc:
Backburner::Worker.enqueue JobWrapper, [ job.serialize ], queue: job.queue_name
end
def enqueue(job) #:nodoc:
Backburner::Worker.enqueue JobWrapper, [ job.serialize ], queue: job.queue_name
end
def enqueue_at(job, timestamp) #:nodoc:
delay = timestamp - Time.current.to_f
Backburner::Worker.enqueue JobWrapper, [ job.serialize ], queue: job.queue_name, delay: delay
end
def enqueue_at(job, timestamp) #:nodoc:
delay = timestamp - Time.current.to_f
Backburner::Worker.enqueue JobWrapper, [ job.serialize ], queue: job.queue_name, delay: delay
end
class JobWrapper #:nodoc:

@ -13,14 +13,12 @@ module QueueAdapters
#
# Rails.application.config.active_job.queue_adapter = :delayed_job
class DelayedJobAdapter
class << self
def enqueue(job) #:nodoc:
Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name)
end
def enqueue(job) #:nodoc:
Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name)
end
def enqueue_at(job, timestamp) #:nodoc:
Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, run_at: Time.at(timestamp))
end
def enqueue_at(job, timestamp) #:nodoc:
Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, run_at: Time.at(timestamp))
end
class JobWrapper #:nodoc:

@ -9,14 +9,12 @@ module QueueAdapters
#
# Rails.application.config.active_job.queue_adapter = :inline
class InlineAdapter
class << self
def enqueue(job) #:nodoc:
Base.execute(job.serialize)
end
def enqueue(job) #:nodoc:
Base.execute(job.serialize)
end
def enqueue_at(*) #:nodoc:
raise NotImplementedError.new("Use a queueing backend to enqueue jobs in the future. Read more at http://guides.rubyonrails.org/active_job_basics.html")
end
def enqueue_at(*) #:nodoc:
raise NotImplementedError.new("Use a queueing backend to enqueue jobs in the future. Read more at http://guides.rubyonrails.org/active_job_basics.html")
end
end
end

@ -16,16 +16,14 @@ module QueueAdapters
#
# Rails.application.config.active_job.queue_adapter = :qu
class QuAdapter
class << self
def enqueue(job, *args) #:nodoc:
Qu::Payload.new(klass: JobWrapper, args: [job.serialize]).tap do |payload|
payload.instance_variable_set(:@queue, job.queue_name)
end.push
end
def enqueue(job, *args) #:nodoc:
Qu::Payload.new(klass: JobWrapper, args: [job.serialize]).tap do |payload|
payload.instance_variable_set(:@queue, job.queue_name)
end.push
end
def enqueue_at(job, timestamp, *args) #:nodoc:
raise NotImplementedError
end
def enqueue_at(job, timestamp, *args) #:nodoc:
raise NotImplementedError
end
class JobWrapper < Qu::Job #:nodoc:

@ -15,14 +15,12 @@ module QueueAdapters
#
# Rails.application.config.active_job.queue_adapter = :que
class QueAdapter
class << self
def enqueue(job) #:nodoc:
JobWrapper.enqueue job.serialize, queue: job.queue_name
end
def enqueue(job) #:nodoc:
JobWrapper.enqueue job.serialize, queue: job.queue_name
end
def enqueue_at(job, timestamp) #:nodoc:
JobWrapper.enqueue job.serialize, queue: job.queue_name, run_at: Time.at(timestamp)
end
def enqueue_at(job, timestamp) #:nodoc:
JobWrapper.enqueue job.serialize, queue: job.queue_name, run_at: Time.at(timestamp)
end
class JobWrapper < Que::Job #:nodoc:

@ -17,29 +17,27 @@ module QueueAdapters
#
# Rails.application.config.active_job.queue_adapter = :queue_classic
class QueueClassicAdapter
class << self
def enqueue(job) #:nodoc:
build_queue(job.queue_name).enqueue("#{JobWrapper.name}.perform", job.serialize)
end
def enqueue(job) #:nodoc:
build_queue(job.queue_name).enqueue("#{JobWrapper.name}.perform", job.serialize)
end
def enqueue_at(job, timestamp) #:nodoc:
queue = build_queue(job.queue_name)
unless queue.respond_to?(:enqueue_at)
raise NotImplementedError, 'To be able to schedule jobs with queue_classic ' \
'the QC::Queue needs to respond to `enqueue_at(timestamp, method, *args)`. ' \
'You can implement this yourself or you can use the queue_classic-later gem.'
end
queue.enqueue_at(timestamp, "#{JobWrapper.name}.perform", job.serialize)
def enqueue_at(job, timestamp) #:nodoc:
queue = build_queue(job.queue_name)
unless queue.respond_to?(:enqueue_at)
raise NotImplementedError, 'To be able to schedule jobs with queue_classic ' \
'the QC::Queue needs to respond to `enqueue_at(timestamp, method, *args)`. ' \
'You can implement this yourself or you can use the queue_classic-later gem.'
end
queue.enqueue_at(timestamp, "#{JobWrapper.name}.perform", job.serialize)
end
# Builds a <tt>QC::Queue</tt> object to schedule jobs on.
#
# If you have a custom <tt>QC::Queue</tt> subclass you'll need to subclass
# <tt>ActiveJob::QueueAdapters::QueueClassicAdapter</tt> and override the
# <tt>build_queue</tt> method.
def build_queue(queue_name)
QC::Queue.new(queue_name)
end
# Builds a <tt>QC::Queue</tt> object to schedule jobs on.
#
# If you have a custom <tt>QC::Queue</tt> subclass you'll need to subclass
# <tt>ActiveJob::QueueAdapters::QueueClassicAdapter</tt> and override the
# <tt>build_queue</tt> method.
def build_queue(queue_name)
QC::Queue.new(queue_name)
end
class JobWrapper #:nodoc:

@ -26,18 +26,16 @@ module QueueAdapters
#
# Rails.application.config.active_job.queue_adapter = :resque
class ResqueAdapter
class << self
def enqueue(job) #:nodoc:
Resque.enqueue_to job.queue_name, JobWrapper, job.serialize
end
def enqueue(job) #:nodoc:
Resque.enqueue_to job.queue_name, JobWrapper, job.serialize
end
def enqueue_at(job, timestamp) #:nodoc:
unless Resque.respond_to?(:enqueue_at_with_queue)
raise NotImplementedError, "To be able to schedule jobs with Resque you need the " \
"resque-scheduler gem. Please add it to your Gemfile and run bundle install"
end
Resque.enqueue_at_with_queue job.queue_name, timestamp, JobWrapper, job.serialize
def enqueue_at(job, timestamp) #:nodoc:
unless Resque.respond_to?(:enqueue_at_with_queue)
raise NotImplementedError, "To be able to schedule jobs with Resque you need the " \
"resque-scheduler gem. Please add it to your Gemfile and run bundle install"
end
Resque.enqueue_at_with_queue job.queue_name, timestamp, JobWrapper, job.serialize
end
class JobWrapper #:nodoc:

@ -15,22 +15,20 @@ module QueueAdapters
#
# Rails.application.config.active_job.queue_adapter = :sidekiq
class SidekiqAdapter
class << self
def enqueue(job) #:nodoc:
#Sidekiq::Client does not support symbols as keys
Sidekiq::Client.push \
'class' => JobWrapper,
'queue' => job.queue_name,
'args' => [ job.serialize ]
end
def enqueue(job) #:nodoc:
#Sidekiq::Client does not support symbols as keys
Sidekiq::Client.push \
'class' => JobWrapper,
'queue' => job.queue_name,
'args' => [ job.serialize ]
end
def enqueue_at(job, timestamp) #:nodoc:
Sidekiq::Client.push \
'class' => JobWrapper,
'queue' => job.queue_name,
'args' => [ job.serialize ],
'at' => timestamp
end
def enqueue_at(job, timestamp) #:nodoc:
Sidekiq::Client.push \
'class' => JobWrapper,
'queue' => job.queue_name,
'args' => [ job.serialize ],
'at' => timestamp
end
class JobWrapper #:nodoc:

@ -16,19 +16,19 @@ module QueueAdapters
#
# Rails.application.config.active_job.queue_adapter = :sneakers
class SneakersAdapter
@monitor = Monitor.new
def initialize
@monitor = Monitor.new
end
class << self
def enqueue(job) #:nodoc:
@monitor.synchronize do
JobWrapper.from_queue job.queue_name
JobWrapper.enqueue ActiveSupport::JSON.encode(job.serialize)
end
def enqueue(job) #:nodoc:
@monitor.synchronize do
JobWrapper.from_queue job.queue_name
JobWrapper.enqueue ActiveSupport::JSON.encode(job.serialize)
end
end
def enqueue_at(job, timestamp) #:nodoc:
raise NotImplementedError
end
def enqueue_at(job, timestamp) #:nodoc:
raise NotImplementedError
end
class JobWrapper #:nodoc:

@ -18,14 +18,12 @@ module QueueAdapters
#
# Rails.application.config.active_job.queue_adapter = :sucker_punch
class SuckerPunchAdapter
class << self
def enqueue(job) #:nodoc:
JobWrapper.new.async.perform job.serialize
end
def enqueue(job) #:nodoc:
JobWrapper.new.async.perform job.serialize
end
def enqueue_at(job, timestamp) #:nodoc:
raise NotImplementedError
end
def enqueue_at(job, timestamp) #:nodoc:
raise NotImplementedError
end
class JobWrapper #:nodoc:

@ -10,52 +10,50 @@ module QueueAdapters
#
# Rails.application.config.active_job.queue_adapter = :test
class TestAdapter
class << self
attr_accessor(:perform_enqueued_jobs, :perform_enqueued_at_jobs, :filter)
attr_writer(:enqueued_jobs, :performed_jobs)
attr_accessor(:perform_enqueued_jobs, :perform_enqueued_at_jobs, :filter)
attr_writer(:enqueued_jobs, :performed_jobs)
# Provides a store of all the enqueued jobs with the TestAdapter so you can check them.
def enqueued_jobs
@enqueued_jobs ||= []
# Provides a store of all the enqueued jobs with the TestAdapter so you can check them.
def enqueued_jobs
@enqueued_jobs ||= []
end
# Provides a store of all the performed jobs with the TestAdapter so you can check them.
def performed_jobs
@performed_jobs ||= []
end
def enqueue(job) #:nodoc:
return if filtered?(job)
job_data = job_to_hash(job)
enqueue_or_perform(perform_enqueued_jobs, job, job_data)
end
def enqueue_at(job, timestamp) #:nodoc:
return if filtered?(job)
job_data = job_to_hash(job, at: timestamp)
enqueue_or_perform(perform_enqueued_at_jobs, job, job_data)
end
private
def job_to_hash(job, extras = {})
{ job: job.class, args: job.serialize.fetch('arguments'), queue: job.queue_name }.merge!(extras)
end
def enqueue_or_perform(perform, job, job_data)
if perform
performed_jobs << job_data
Base.execute job.serialize
else
enqueued_jobs << job_data
end
end
# Provides a store of all the performed jobs with the TestAdapter so you can check them.
def performed_jobs
@performed_jobs ||= []
end
def enqueue(job) #:nodoc:
return if filtered?(job)
job_data = job_to_hash(job)
enqueue_or_perform(perform_enqueued_jobs, job, job_data)
end
def enqueue_at(job, timestamp) #:nodoc:
return if filtered?(job)
job_data = job_to_hash(job, at: timestamp)
enqueue_or_perform(perform_enqueued_at_jobs, job, job_data)
end
private
def job_to_hash(job, extras = {})
{ job: job.class, args: job.serialize.fetch('arguments'), queue: job.queue_name }.merge!(extras)
end
def enqueue_or_perform(perform, job, job_data)
if perform
performed_jobs << job_data
Base.execute job.serialize
else
enqueued_jobs << job_data
end
end
def filtered?(job)
filter && !Array(filter).include?(job.class)
end
def filtered?(job)
filter && !Array(filter).include?(job.class)
end
end
end

@ -11,9 +11,6 @@ def before_setup
ActiveJob::Base.queue_adapter = :test
clear_enqueued_jobs
clear_performed_jobs
queue_adapter.perform_enqueued_jobs = false
queue_adapter.perform_enqueued_at_jobs = false
queue_adapter.filter = nil
super
end

@ -2,6 +2,6 @@
class AdapterTest < ActiveSupport::TestCase
test "should load #{ENV['AJ_ADAPTER']} adapter" do
assert_equal "active_job/queue_adapters/#{ENV['AJ_ADAPTER']}_adapter".classify, ActiveJob::Base.queue_adapter.name
assert_equal "active_job/queue_adapters/#{ENV['AJ_ADAPTER']}_adapter".classify, ActiveJob::Base.queue_adapter.class.name
end
end

@ -0,0 +1,24 @@
require 'helper'
class QueueAdapterTest < ActiveJob::TestCase
test 'should forbid nonsense arguments' do
assert_raises(ArgumentError) { ActiveJob::Base.queue_adapter = Mutex }
assert_raises(ArgumentError) { ActiveJob::Base.queue_adapter = Mutex.new }
end
test 'should warn on passing an adapter class' do
klass = Class.new do
def self.name
'fake'
end
def enqueue(*)
end
def enqueue_at(*)
end
end
assert_deprecated { ActiveJob::Base.queue_adapter = klass }
end
end

@ -9,6 +9,6 @@ def test_include_helper
end
def test_set_test_adapter
assert_equal ActiveJob::QueueAdapters::TestAdapter, self.queue_adapter
assert_kind_of ActiveJob::QueueAdapters::TestAdapter, self.queue_adapter
end
end

@ -27,8 +27,8 @@ def clear_jobs
jobs_manager.clear_jobs
end
def adapter_is?(adapter)
ActiveJob::Base.queue_adapter.name.split("::").last.gsub(/Adapter$/, '').underscore==adapter.to_s
def adapter_is?(adapter_class_symbol)
ActiveJob::Base.queue_adapter.class.name.split("::").last.gsub(/Adapter$/, '').underscore == adapter_class_symbol.to_s
end
def wait_for_jobs_to_finish_for(seconds=60)