commit
add626e365
@ -3,19 +3,19 @@
|
||||
module ActiveJob
|
||||
module Enqueuing
|
||||
extend ActiveSupport::Concern
|
||||
|
||||
|
||||
module ClassMethods
|
||||
# Push a job onto the queue. The arguments must be legal JSON types
|
||||
# (string, int, float, nil, true, false, hash or array) or
|
||||
# ActiveModel::GlobalIdentication instances. Arbitrary Ruby objects
|
||||
# are not supported.
|
||||
#
|
||||
# Returns an instance of the job class queued with args available in
|
||||
# Returns an instance of the job class queued with args available in
|
||||
# Job#arguments.
|
||||
def enqueue(*args)
|
||||
new(args).tap do |job|
|
||||
job.run_callbacks :enqueue do
|
||||
queue_adapter.enqueue self, *Arguments.serialize(args)
|
||||
queue_adapter.enqueue self, job.job_id, *Arguments.serialize(args)
|
||||
end
|
||||
end
|
||||
end
|
||||
@ -24,7 +24,7 @@ def enqueue(*args)
|
||||
#
|
||||
# enqueue_in(1.week, "mike")
|
||||
#
|
||||
# Returns an instance of the job class queued with args available in
|
||||
# Returns an instance of the job class queued with args available in
|
||||
# Job#arguments and the timestamp in Job#enqueue_at.
|
||||
def enqueue_in(interval, *args)
|
||||
enqueue_at interval.seconds.from_now, *args
|
||||
@ -34,19 +34,19 @@ def enqueue_in(interval, *args)
|
||||
#
|
||||
# enqueue_at(Date.tomorrow.midnight, "mike")
|
||||
#
|
||||
# Returns an instance of the job class queued with args available in
|
||||
# Returns an instance of the job class queued with args available in
|
||||
# Job#arguments and the timestamp in Job#enqueue_at.
|
||||
def enqueue_at(timestamp, *args)
|
||||
new(args).tap do |job|
|
||||
job.enqueued_at = timestamp
|
||||
|
||||
job.run_callbacks :enqueue do
|
||||
queue_adapter.enqueue_at self, timestamp.to_f, *Arguments.serialize(args)
|
||||
queue_adapter.enqueue_at self, timestamp.to_f, job.job_id, *Arguments.serialize(args)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
included do
|
||||
attr_accessor :arguments
|
||||
attr_accessor :enqueued_at
|
||||
@ -55,11 +55,11 @@ def enqueue_at(timestamp, *args)
|
||||
def initialize(arguments = nil)
|
||||
@arguments = arguments
|
||||
end
|
||||
|
||||
|
||||
def retry_now
|
||||
self.class.enqueue *arguments
|
||||
end
|
||||
|
||||
|
||||
def retry_in(interval)
|
||||
self.class.enqueue_in interval, *arguments
|
||||
end
|
||||
|
@ -4,12 +4,13 @@
|
||||
module ActiveJob
|
||||
module Execution
|
||||
extend ActiveSupport::Concern
|
||||
|
||||
|
||||
included do
|
||||
include ActiveSupport::Rescuable
|
||||
end
|
||||
|
||||
def execute(*serialized_args)
|
||||
def execute(job_id, *serialized_args)
|
||||
self.job_id = job_id
|
||||
self.arguments = Arguments.deserialize(serialized_args)
|
||||
|
||||
run_callbacks :perform do
|
||||
|
@ -26,10 +26,10 @@ module Logging
|
||||
before_enqueue do |job|
|
||||
if job.enqueued_at
|
||||
ActiveSupport::Notifications.instrument "enqueue_at.active_job",
|
||||
adapter: job.class.queue_adapter, job: job.class, args: job.arguments, timestamp: job.enqueued_at
|
||||
adapter: job.class.queue_adapter, job: job.class, job_id: job.job_id, args: job.arguments, timestamp: job.enqueued_at
|
||||
else
|
||||
ActiveSupport::Notifications.instrument "enqueue.active_job",
|
||||
adapter: job.class.queue_adapter, job: job.class, args: job.arguments
|
||||
adapter: job.class.queue_adapter, job: job.class, job_id: job.job_id, args: job.arguments
|
||||
end
|
||||
end
|
||||
end
|
||||
@ -50,11 +50,11 @@ def logger_tagged_by_active_job?
|
||||
|
||||
class LogSubscriber < ActiveSupport::LogSubscriber
|
||||
def enqueue(event)
|
||||
info "Enqueued #{event.payload[:job].name} to #{queue_name(event)}" + args_info(event)
|
||||
info "Enqueued #{event.payload[:job].name} (Job ID: #{event.payload[:job_id]}) to #{queue_name(event)}" + args_info(event)
|
||||
end
|
||||
|
||||
def enqueue_at(event)
|
||||
info "Enqueued #{event.payload[:job].name} to #{queue_name(event)} at #{enqueued_at(event)}" + args_info(event)
|
||||
info "Enqueued #{event.payload[:job].name} (Job ID: #{event.payload[:job_id]}) to #{queue_name(event)} at #{enqueued_at(event)}" + args_info(event)
|
||||
end
|
||||
|
||||
def perform_start(event)
|
||||
@ -67,7 +67,7 @@ def perform(event)
|
||||
|
||||
private
|
||||
def queue_name(event)
|
||||
event.payload[:adapter].name.demodulize.remove('Adapter')
|
||||
event.payload[:adapter].name.demodulize.remove('Adapter') + "(#{event.payload[:job].queue_name})"
|
||||
end
|
||||
|
||||
def args_info(event)
|
||||
|
@ -8,11 +8,11 @@ module QueueAdapters
|
||||
class ResqueAdapter
|
||||
class << self
|
||||
def enqueue(job, *args)
|
||||
Resque.enqueue JobWrapper.new(job), job, *args
|
||||
Resque.enqueue_to job.queue_name, JobWrapper, job.name, *args
|
||||
end
|
||||
|
||||
def enqueue_at(job, timestamp, *args)
|
||||
Resque.enqueue_at timestamp, JobWrapper.new(job), job, *args
|
||||
Resque.enqueue_at_with_queue job.queue_name, timestamp, JobWrapper, job.name, *args
|
||||
end
|
||||
end
|
||||
|
||||
@ -22,14 +22,6 @@ def perform(job_name, *args)
|
||||
job_name.constantize.new.execute *args
|
||||
end
|
||||
end
|
||||
|
||||
def initialize(job)
|
||||
@queue = job.queue_name
|
||||
end
|
||||
|
||||
def to_s
|
||||
self.class.name
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
1
lib/activejob.rb
Normal file
1
lib/activejob.rb
Normal file
@ -0,0 +1 @@
|
||||
require 'active_job'
|
@ -5,7 +5,7 @@
|
||||
|
||||
class CallbacksTest < ActiveSupport::TestCase
|
||||
test 'perform callbacks' do
|
||||
performed_callback_job = CallbackJob.new.tap { |j| j.execute }
|
||||
performed_callback_job = CallbackJob.new.tap { |j| j.execute("A-JOB-ID") }
|
||||
assert "CallbackJob ran before_perform".in? performed_callback_job.history
|
||||
assert "CallbackJob ran after_perform".in? performed_callback_job.history
|
||||
assert "CallbackJob ran around_perform_start".in? performed_callback_job.history
|
||||
|
@ -46,7 +46,7 @@ def test_uses_active_job_as_tag
|
||||
|
||||
def test_enqueue_job_logging
|
||||
HelloJob.enqueue "Cristian"
|
||||
assert_match(/Enqueued HelloJob to .*?:.*Cristian/, @logger.messages)
|
||||
assert_match(/Enqueued HelloJob \(Job ID: .*?\) to .*?:.*Cristian/, @logger.messages)
|
||||
end
|
||||
|
||||
def test_perform_job_logging
|
||||
@ -69,9 +69,9 @@ def test_perform_uses_job_id_job_logging
|
||||
def test_perform_nested_jobs_logging
|
||||
NestedJob.enqueue
|
||||
assert_match(/\[LoggingJob\] \[.*?\]/, @logger.messages)
|
||||
assert_match(/\[ActiveJob\] Enqueued NestedJob to/, @logger.messages)
|
||||
assert_match(/\[ActiveJob\] Enqueued NestedJob \(Job ID: .*\) to/, @logger.messages)
|
||||
assert_match(/\[ActiveJob\] \[NestedJob\] \[NESTED-JOB-ID\] Performing NestedJob from/, @logger.messages)
|
||||
assert_match(/\[ActiveJob\] \[NestedJob\] \[NESTED-JOB-ID\] Enqueued LoggingJob to .* with arguments: "NestedJob"/, @logger.messages)
|
||||
assert_match(/\[ActiveJob\] \[NestedJob\] \[NESTED-JOB-ID\] Enqueued LoggingJob \(Job ID: .*?\) to .* with arguments: "NestedJob"/, @logger.messages)
|
||||
assert_match(/\[ActiveJob\].*\[LoggingJob\] \[LOGGING-JOB-ID\] Performing LoggingJob from .* with arguments: "NestedJob"/, @logger.messages)
|
||||
assert_match(/\[ActiveJob\].*\[LoggingJob\] \[LOGGING-JOB-ID\] Dummy, here is it: NestedJob/, @logger.messages)
|
||||
assert_match(/\[ActiveJob\].*\[LoggingJob\] \[LOGGING-JOB-ID\] Performed LoggingJob from .* in/, @logger.messages)
|
||||
@ -80,14 +80,14 @@ def test_perform_nested_jobs_logging
|
||||
|
||||
def test_enqueue_at_job_logging
|
||||
HelloJob.enqueue_at 1, "Cristian"
|
||||
assert_match(/Enqueued HelloJob to .*? at.*Cristian/, @logger.messages)
|
||||
assert_match(/Enqueued HelloJob \(Job ID: .*\) to .*? at.*Cristian/, @logger.messages)
|
||||
rescue NotImplementedError
|
||||
skip
|
||||
end
|
||||
|
||||
def test_enqueue_in_job_logging
|
||||
HelloJob.enqueue_in 2, "Cristian"
|
||||
assert_match(/Enqueued HelloJob to .*? at.*Cristian/, @logger.messages)
|
||||
assert_match(/Enqueued HelloJob \(Job ID: .*\) to .*? at.*Cristian/, @logger.messages)
|
||||
rescue NotImplementedError
|
||||
skip
|
||||
end
|
||||
|
@ -7,17 +7,17 @@ class RescueTest < ActiveSupport::TestCase
|
||||
setup do
|
||||
$BUFFER = []
|
||||
end
|
||||
|
||||
|
||||
test 'rescue perform exception with retry' do
|
||||
job = RescueJob.new
|
||||
job.execute("david")
|
||||
job.execute(SecureRandom.uuid, "david")
|
||||
assert_equal [ "rescued from ArgumentError", "performed beautifully" ], $BUFFER
|
||||
end
|
||||
|
||||
test 'let through unhandled perform exception' do
|
||||
job = RescueJob.new
|
||||
assert_raises(RescueJob::OtherError) do
|
||||
job.execute("other")
|
||||
job.execute(SecureRandom.uuid, "other")
|
||||
end
|
||||
end
|
||||
end
|
||||
|
Loading…
Reference in New Issue
Block a user