merge master
This commit is contained in:
commit
897f86b8d1
1
Gemfile
1
Gemfile
@ -11,3 +11,4 @@ gem 'delayed_job'
|
|||||||
gem 'queue_classic'
|
gem 'queue_classic'
|
||||||
gem 'sneakers', '0.1.1.pre'
|
gem 'sneakers', '0.1.1.pre'
|
||||||
gem 'que'
|
gem 'que'
|
||||||
|
gem 'backburner'
|
||||||
|
@ -21,12 +21,17 @@ GEM
|
|||||||
thread_safe (~> 0.1)
|
thread_safe (~> 0.1)
|
||||||
tzinfo (~> 1.1)
|
tzinfo (~> 1.1)
|
||||||
amq-protocol (1.9.2)
|
amq-protocol (1.9.2)
|
||||||
|
backburner (0.4.5)
|
||||||
|
beaneater (~> 0.3.1)
|
||||||
|
dante (~> 0.1.5)
|
||||||
|
beaneater (0.3.2)
|
||||||
builder (3.2.2)
|
builder (3.2.2)
|
||||||
bunny (1.1.9)
|
bunny (1.1.9)
|
||||||
amq-protocol (>= 1.9.2)
|
amq-protocol (>= 1.9.2)
|
||||||
celluloid (0.15.2)
|
celluloid (0.15.2)
|
||||||
timers (~> 1.1.0)
|
timers (~> 1.1.0)
|
||||||
connection_pool (2.0.0)
|
connection_pool (2.0.0)
|
||||||
|
dante (0.1.5)
|
||||||
delayed_job (4.0.1)
|
delayed_job (4.0.1)
|
||||||
activesupport (>= 3.0, < 4.2)
|
activesupport (>= 3.0, < 4.2)
|
||||||
i18n (0.6.9)
|
i18n (0.6.9)
|
||||||
@ -92,6 +97,7 @@ PLATFORMS
|
|||||||
|
|
||||||
DEPENDENCIES
|
DEPENDENCIES
|
||||||
activejob!
|
activejob!
|
||||||
|
backburner
|
||||||
delayed_job
|
delayed_job
|
||||||
que
|
que
|
||||||
queue_classic
|
queue_classic
|
||||||
|
4
Rakefile
4
Rakefile
@ -20,11 +20,11 @@ task :default => :test
|
|||||||
|
|
||||||
desc 'Run all adapter tests'
|
desc 'Run all adapter tests'
|
||||||
task :test do
|
task :test do
|
||||||
tasks = %w(test_inline test_delayed_job test_que test_queue_classic test_resque test_sidekiq test_sneakers test_sucker_punch)
|
tasks = %w(test_inline test_delayed_job test_que test_queue_classic test_resque test_sidekiq test_sneakers test_sucker_punch test_backburner)
|
||||||
run_without_aborting(*tasks)
|
run_without_aborting(*tasks)
|
||||||
end
|
end
|
||||||
|
|
||||||
%w(inline delayed_job que queue_classic resque sidekiq sneakers sucker_punch).each do |adapter|
|
%w(inline delayed_job que queue_classic resque sidekiq sneakers sucker_punch backburner).each do |adapter|
|
||||||
Rake::TestTask.new("test_#{adapter}") do |t|
|
Rake::TestTask.new("test_#{adapter}") do |t|
|
||||||
t.libs << 'test'
|
t.libs << 'test'
|
||||||
t.test_files = FileList['test/cases/**/*_test.rb']
|
t.test_files = FileList['test/cases/**/*_test.rb']
|
||||||
|
@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
module ActiveJob
|
module ActiveJob
|
||||||
module Enqueuing
|
module Enqueuing
|
||||||
##
|
#
|
||||||
# Push a job onto the queue. The arguments must be legal JSON types
|
# Push a job onto the queue. The arguments must be legal JSON types
|
||||||
# (string, int, float, nil, true, false, hash or array) or
|
# (string, int, float, nil, true, false, hash or array) or
|
||||||
# ActiveModel::GlobalIdentication instances. Arbitrary Ruby objects
|
# ActiveModel::GlobalIdentication instances. Arbitrary Ruby objects
|
||||||
@ -11,11 +11,12 @@ module Enqueuing
|
|||||||
# The return value is adapter-specific and may change in a future
|
# The return value is adapter-specific and may change in a future
|
||||||
# ActiveJob release.
|
# ActiveJob release.
|
||||||
def enqueue(*args)
|
def enqueue(*args)
|
||||||
ActiveSupport::Notifications.instrument "enqueue.active_job", adapter: queue_adapter, job: self, params: args
|
serialized_args = Parameters.serialize(args)
|
||||||
queue_adapter.queue self, *Parameters.serialize(args)
|
ActiveSupport::Notifications.instrument "enqueue.active_job", adapter: queue_adapter, job: self, args: serialized_args
|
||||||
|
queue_adapter.queue self, *serialized_args
|
||||||
end
|
end
|
||||||
|
|
||||||
##
|
#
|
||||||
# Enqueue a job to be performed at +interval+ from now.
|
# Enqueue a job to be performed at +interval+ from now.
|
||||||
#
|
#
|
||||||
# enqueue_in(1.week, "mike")
|
# enqueue_in(1.week, "mike")
|
||||||
@ -25,7 +26,7 @@ def enqueue_in(interval, *args)
|
|||||||
enqueue_at(interval.from_now, *args)
|
enqueue_at(interval.from_now, *args)
|
||||||
end
|
end
|
||||||
|
|
||||||
##
|
#
|
||||||
# Enqueue a job to be performed at an explicit point in time.
|
# Enqueue a job to be performed at an explicit point in time.
|
||||||
#
|
#
|
||||||
# enqueue_at(Date.tomorrow.midnight, "mike")
|
# enqueue_at(Date.tomorrow.midnight, "mike")
|
||||||
@ -33,12 +34,8 @@ def enqueue_in(interval, *args)
|
|||||||
# Returns truthy if a job was scheduled.
|
# Returns truthy if a job was scheduled.
|
||||||
def enqueue_at(timestamp, *args)
|
def enqueue_at(timestamp, *args)
|
||||||
ts = timestamp.to_f
|
ts = timestamp.to_f
|
||||||
ActiveSupport::Notifications.instrument "enqueue_at.active_job", adapter: queue_adapter, timestamp: ts, job: self, params: args
|
ActiveSupport::Notifications.instrument "enqueue_at.active_job", adapter: queue_adapter, timestamp: ts, job: self, args: args
|
||||||
if Time.now.to_f > ts
|
queue_adapter.queue_at self, ts, *Parameters.serialize(args)
|
||||||
queue_adapter.queue self, *Parameters.serialize(args)
|
|
||||||
else
|
|
||||||
queue_adapter.queue_at self, ts, *Parameters.serialize(args)
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -1,31 +1,22 @@
|
|||||||
|
require 'active_support/core_ext/string/filters'
|
||||||
|
|
||||||
module ActiveJob
|
module ActiveJob
|
||||||
class LogSubscriber < ActiveSupport::LogSubscriber
|
class LogSubscriber < ActiveSupport::LogSubscriber
|
||||||
def enqueue(event)
|
def enqueue(event)
|
||||||
payload = event.payload
|
queue_name = event.payload[:adapter].name.demodulize.remove('Adapter')
|
||||||
params = payload[:params]
|
job_name = event.payload[:job].name
|
||||||
adapter = payload[:adapter]
|
args = event.payload[:args].any? ? ": #{event.payload[:args].inspect}" : ""
|
||||||
job = payload[:job]
|
|
||||||
|
|
||||||
info "ActiveJob enqueued to #{adapter.name.demodulize} job #{job.name}: #{params.inspect}"
|
info "Enqueued #{job_name} to #{queue_name}" + args
|
||||||
end
|
end
|
||||||
|
|
||||||
def enqueue_at(event)
|
def enqueue_at(event)
|
||||||
payload = event.payload
|
queue_name = event.payload[:adapter].name.demodulize.remove('Adapter')
|
||||||
params = payload[:params]
|
job_name = event.payload[:job].name
|
||||||
adapter = payload[:adapter]
|
args = event.payload[:args].any? ? ": #{event.payload[:args].inspect}" : ""
|
||||||
job = payload[:job]
|
time = event.payload[:timestamp]
|
||||||
time = payload[:timestamp]
|
|
||||||
|
|
||||||
info "ActiveJob enqueued at #{time} to #{adapter.name.demodulize} job #{job.name}: #{params.inspect}"
|
info "Enqueued #{job_name} to #{queue_name} at #{time}" + args
|
||||||
end
|
|
||||||
|
|
||||||
def perform_error(event)
|
|
||||||
payload = event.payload
|
|
||||||
params = payload[:params]
|
|
||||||
job = payload[:job]
|
|
||||||
error = payload[:error]
|
|
||||||
|
|
||||||
warn "ActiveJob caught error executing #{job} with #{params.inspect}: #{error.message}"
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def logger
|
def logger
|
||||||
|
25
lib/active_job/queue_adapters/backburner_adapter.rb
Normal file
25
lib/active_job/queue_adapters/backburner_adapter.rb
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
require 'backburner'
|
||||||
|
|
||||||
|
module ActiveJob
|
||||||
|
module QueueAdapters
|
||||||
|
class BackburnerAdapter
|
||||||
|
class << self
|
||||||
|
def queue(job, *args)
|
||||||
|
Backburner::Worker.enqueue JobWrapper, [ job.name, *args ], queue: job.queue_name
|
||||||
|
end
|
||||||
|
|
||||||
|
def queue_at(job, timestamp, *args)
|
||||||
|
raise NotImplementedError
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
class JobWrapper
|
||||||
|
class << self
|
||||||
|
def perform(job_name, *args)
|
||||||
|
job_name.constantize.new.perform *Parameters.deserialize(args)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
@ -14,7 +14,7 @@ def queue_at(job, ts, *args)
|
|||||||
sleep(interval) if interval > 0
|
sleep(interval) if interval > 0
|
||||||
job.new.perform *Parameters.deserialize(args)
|
job.new.perform *Parameters.deserialize(args)
|
||||||
rescue => ex
|
rescue => ex
|
||||||
ActiveSupport::Notifications.instrument "perform_error.active_job", adapter: self, job: job, params: args, error: ex
|
ActiveJob::Base.logger "Error performing #{job}: #{ex.message}"
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -5,12 +5,20 @@ module QueueAdapters
|
|||||||
class SidekiqAdapter
|
class SidekiqAdapter
|
||||||
class << self
|
class << self
|
||||||
def queue(job, *args)
|
def queue(job, *args)
|
||||||
JobWrapper.client_push class: JobWrapper, queue: job.queue_name, args: [ job, *args ]
|
Sidekiq::Client.push \
|
||||||
|
'class' => JobWrapper,
|
||||||
|
'queue' => job.queue_name,
|
||||||
|
'args' => [ job, *args ],
|
||||||
|
'retry' => true
|
||||||
end
|
end
|
||||||
|
|
||||||
def queue_at(job, timestamp, *args)
|
def queue_at(job, timestamp, *args)
|
||||||
job = { class: JobWrapper, queue: job.queue_name, args: [ job, *args ], at: timestamp }
|
Sidekiq::Client.push \
|
||||||
JobWrapper.client_push(job)
|
'class' => JobWrapper,
|
||||||
|
'queue' => job.queue_name,
|
||||||
|
'args' => [ job, *args ],
|
||||||
|
'at' => timestamp,
|
||||||
|
'retry' => true
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -1,11 +1,17 @@
|
|||||||
require 'sneakers'
|
require 'sneakers'
|
||||||
|
require 'thread'
|
||||||
|
|
||||||
module ActiveJob
|
module ActiveJob
|
||||||
module QueueAdapters
|
module QueueAdapters
|
||||||
class SneakersAdapter
|
class SneakersAdapter
|
||||||
|
@mutex = Mutex.new
|
||||||
|
|
||||||
class << self
|
class << self
|
||||||
def queue(job, *args)
|
def queue(job, *args)
|
||||||
JobWrapper.enqueue([job, *args])
|
@mutex.synchronize do
|
||||||
|
JobWrapper.from_queue job.queue_name
|
||||||
|
JobWrapper.enqueue [ job, *args ]
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def queue_at(job, timestamp, *args)
|
def queue_at(job, timestamp, *args)
|
||||||
@ -16,8 +22,6 @@ def queue_at(job, timestamp, *args)
|
|||||||
class JobWrapper
|
class JobWrapper
|
||||||
include Sneakers::Worker
|
include Sneakers::Worker
|
||||||
|
|
||||||
self.from_queue("queue", {})
|
|
||||||
|
|
||||||
def work(job, *args)
|
def work(job, *args)
|
||||||
job.new.perform *Parameters.deserialize(args)
|
job.new.perform *Parameters.deserialize(args)
|
||||||
end
|
end
|
||||||
|
@ -9,7 +9,12 @@ def queue(job, *args)
|
|||||||
end
|
end
|
||||||
|
|
||||||
def queue_at(job, timestamp, *args)
|
def queue_at(job, timestamp, *args)
|
||||||
JobWrapper.new.async.later(secs, job, *args)
|
delay = Time.now.to_f - timestamp
|
||||||
|
if delay > 0
|
||||||
|
JobWrapper.new.async.later(delay, job, *args)
|
||||||
|
else
|
||||||
|
JobWrapper.new.async.perform(job, *args)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
3
test/adapters/backburner.rb
Normal file
3
test/adapters/backburner.rb
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
require 'support/backburner/inline'
|
||||||
|
|
||||||
|
ActiveJob::Base.queue_adapter = :backburner
|
@ -43,4 +43,9 @@ class AdapterTest < ActiveSupport::TestCase
|
|||||||
ActiveJob::Base.queue_adapter = :sneakers
|
ActiveJob::Base.queue_adapter = :sneakers
|
||||||
assert_equal ActiveJob::QueueAdapters::SneakersAdapter, ActiveJob::Base.queue_adapter
|
assert_equal ActiveJob::QueueAdapters::SneakersAdapter, ActiveJob::Base.queue_adapter
|
||||||
end
|
end
|
||||||
|
|
||||||
|
test 'should load Backburner adapter' do
|
||||||
|
ActiveJob::Base.queue_adapter = :backburner
|
||||||
|
assert_equal ActiveJob::QueueAdapters::BackburnerAdapter, ActiveJob::Base.queue_adapter
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
@ -7,3 +7,5 @@
|
|||||||
require "adapters/#{ENV['AJADAPTER'] || 'inline'}"
|
require "adapters/#{ENV['AJADAPTER'] || 'inline'}"
|
||||||
|
|
||||||
require 'active_support/testing/autorun'
|
require 'active_support/testing/autorun'
|
||||||
|
|
||||||
|
ActiveJob::Logging.logger.level = Logger::ERROR
|
||||||
|
8
test/support/backburner/inline.rb
Normal file
8
test/support/backburner/inline.rb
Normal file
@ -0,0 +1,8 @@
|
|||||||
|
require 'backburner'
|
||||||
|
|
||||||
|
Backburner::Worker.class_eval do
|
||||||
|
class << self; alias_method :original_enqueue, :enqueue; end
|
||||||
|
def self.enqueue(job_class, args=[], opts={})
|
||||||
|
job_class.perform(*args)
|
||||||
|
end
|
||||||
|
end
|
Loading…
Reference in New Issue
Block a user