aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--lib/active_job/base.rb7
-rw-r--r--lib/active_job/callbacks.rb40
-rw-r--r--lib/active_job/enqueuing.rb82
-rw-r--r--lib/active_job/logging.rb25
-rw-r--r--lib/active_job/performing.rb14
-rw-r--r--lib/active_job/queue_adapters/backburner_adapter.rb2
-rw-r--r--lib/active_job/queue_adapters/delayed_job_adapter.rb2
-rw-r--r--lib/active_job/queue_adapters/inline_adapter.rb4
-rw-r--r--lib/active_job/queue_adapters/que_adapter.rb2
-rw-r--r--lib/active_job/queue_adapters/queue_classic_adapter.rb2
-rw-r--r--lib/active_job/queue_adapters/resque_adapter.rb2
-rw-r--r--lib/active_job/queue_adapters/sidekiq_adapter.rb2
-rw-r--r--lib/active_job/queue_adapters/sneakers_adapter.rb2
-rw-r--r--lib/active_job/queue_adapters/sucker_punch_adapter.rb2
-rw-r--r--test/cases/callbacks_test.rb23
-rw-r--r--test/cases/queuing_test.rb15
-rw-r--r--test/jobs/callback_job.rb32
17 files changed, 204 insertions, 54 deletions
diff --git a/lib/active_job/base.rb b/lib/active_job/base.rb
index e6b02708a1..1b88bc5bcc 100644
--- a/lib/active_job/base.rb
+++ b/lib/active_job/base.rb
@@ -3,14 +3,17 @@ require 'active_job/queue_name'
require 'active_job/enqueuing'
require 'active_job/performing'
require 'active_job/logging'
+require 'active_job/callbacks'
module ActiveJob
class Base
extend QueueAdapter
extend QueueName
- extend Enqueuing
+
+ include Enqueuing
include Performing
- extend Logging
+ include Callbacks
+ include Logging
ActiveSupport.run_load_hooks(:active_job, self)
end
diff --git a/lib/active_job/callbacks.rb b/lib/active_job/callbacks.rb
new file mode 100644
index 0000000000..c69e4a3b55
--- /dev/null
+++ b/lib/active_job/callbacks.rb
@@ -0,0 +1,40 @@
+require 'active_support/callbacks'
+
+module ActiveJob
+ module Callbacks
+ extend ActiveSupport::Concern
+ include ActiveSupport::Callbacks
+
+ included do
+ define_callbacks :perform
+ define_callbacks :enqueue
+ end
+
+ module ClassMethods
+ def before_perform(*filters, &blk)
+ set_callback(:perform, :before, *filters, &blk)
+ end
+
+ def after_perform(*filters, &blk)
+ set_callback(:perform, :after, *filters, &blk)
+ end
+
+ def around_perform(*filters, &blk)
+ set_callback(:perform, :around, *filters, &blk)
+ end
+
+
+ def before_enqueue(*filters, &blk)
+ set_callback(:enqueue, :before, *filters, &blk)
+ end
+
+ def after_enqueue(*filters, &blk)
+ set_callback(:enqueue, :after, *filters, &blk)
+ end
+
+ def around_enqueue(*filters, &blk)
+ set_callback(:enqueue, :around, *filters, &blk)
+ end
+ end
+ end
+end \ No newline at end of file
diff --git a/lib/active_job/enqueuing.rb b/lib/active_job/enqueuing.rb
index e8f3272782..8b1f29ce77 100644
--- a/lib/active_job/enqueuing.rb
+++ b/lib/active_job/enqueuing.rb
@@ -2,42 +2,58 @@ require 'active_job/parameters'
module ActiveJob
module Enqueuing
- # Push a job onto the queue. The arguments must be legal JSON types
- # (string, int, float, nil, true, false, hash or array) or
- # ActiveModel::GlobalIdentication instances. Arbitrary Ruby objects
- # are not supported.
- #
- # The return value is adapter-specific and may change in a future
- # ActiveJob release.
- def enqueue(*args)
- serialized_args = Parameters.serialize(args)
- instrument_enqueuing :enqueue, args: serialized_args
- queue_adapter.enqueue self, *serialized_args
- end
+ extend ActiveSupport::Concern
+
+ module ClassMethods
+ # Push a job onto the queue. The arguments must be legal JSON types
+ # (string, int, float, nil, true, false, hash or array) or
+ # ActiveModel::GlobalIdentication instances. Arbitrary Ruby objects
+ # are not supported.
+ #
+ # Returns an instance of the job class queued with args available in
+ # Job#arguments.
+ def enqueue(*args)
+ new(args).tap do |job|
+ job.run_callbacks :enqueue do
+ queue_adapter.enqueue self, *Parameters.serialize(args)
+ end
+ end
+ end
- # Enqueue a job to be performed at +interval+ from now.
- #
- # enqueue_in(1.week, "mike")
- #
- # Returns truthy if a job was scheduled.
- def enqueue_in(interval, *args)
- enqueue_at(interval.seconds.from_now, *args)
- end
+ # Enqueue a job to be performed at +interval+ from now.
+ #
+ # enqueue_in(1.week, "mike")
+ #
+ # Returns an instance of the job class queued with args available in
+ # Job#arguments and the timestamp in Job#enqueue_at.
+ def enqueue_in(interval, *args)
+ enqueue_at interval.seconds.from_now, *args
+ end
+
+ # Enqueue a job to be performed at an explicit point in time.
+ #
+ # enqueue_at(Date.tomorrow.midnight, "mike")
+ #
+ # Returns an instance of the job class queued with args available in
+ # Job#arguments and the timestamp in Job#enqueue_at.
+ def enqueue_at(timestamp, *args)
+ new(args).tap do |job|
+ job.enqueued_at = timestamp
- # Enqueue a job to be performed at an explicit point in time.
- #
- # enqueue_at(Date.tomorrow.midnight, "mike")
- #
- # Returns truthy if a job was scheduled.
- def enqueue_at(timestamp, *args)
- serialized_args = Parameters.serialize(args)
- instrument_enqueuing :enqueue_at, args: serialized_args, timestamp: timestamp
- queue_adapter.enqueue_at self, timestamp.to_f, *serialized_args
+ job.run_callbacks :enqueue do
+ queue_adapter.enqueue_at self, timestamp.to_f, *Parameters.serialize(args)
+ end
+ end
+ end
end
- private
- def instrument_enqueuing(method_name, options = {})
- ActiveSupport::Notifications.instrument "#{method_name}.active_job", options.merge(adapter: queue_adapter, job: self)
- end
+ included do
+ attr_accessor :arguments
+ attr_accessor :enqueued_at
+ end
+
+ def initialize(arguments = nil)
+ @arguments = arguments
+ end
end
end
diff --git a/lib/active_job/logging.rb b/lib/active_job/logging.rb
index f3cc599a14..f4a33ffe19 100644
--- a/lib/active_job/logging.rb
+++ b/lib/active_job/logging.rb
@@ -2,7 +2,29 @@ require 'active_support/core_ext/string/filters'
module ActiveJob
module Logging
- mattr_accessor(:logger) { ActiveSupport::Logger.new(STDOUT) }
+ extend ActiveSupport::Concern
+
+ module ClassMethods
+ mattr_accessor(:logger) { ActiveSupport::Logger.new(STDOUT) }
+ end
+
+ included do
+ before_enqueue do |job|
+ if job.enqueued_at
+ ActiveSupport::Notifications.instrument "enqueue_at.active_job",
+ adapter: job.class.queue_adapter, job: job.class, args: job.arguments, timestamp: job.enqueued_at
+ else
+ ActiveSupport::Notifications.instrument "enqueue.active_job",
+ adapter: job.class.queue_adapter, job: job.class, args: job.arguments
+ end
+ end
+
+ before_perform do |job|
+ ActiveSupport::Notifications.instrument "perform.active_job",
+ adapter: job.class.queue_adapter, job: job.class, args: job.arguments
+ end
+ end
+
class LogSubscriber < ActiveSupport::LogSubscriber
def enqueue(event)
@@ -17,6 +39,7 @@ module ActiveJob
info "Performed #{event.payload[:job].name} from #{queue_name(event)}" + args_info(event)
end
+
private
def queue_name(event)
event.payload[:adapter].name.demodulize.remove('Adapter')
diff --git a/lib/active_job/performing.rb b/lib/active_job/performing.rb
index eca311578d..126193995c 100644
--- a/lib/active_job/performing.rb
+++ b/lib/active_job/performing.rb
@@ -2,18 +2,16 @@ require 'active_job/parameters'
module ActiveJob
module Performing
- def perform_with_deserialization(*serialized_args)
- instrument_performing serialized_args
- perform *Parameters.deserialize(serialized_args)
+ def perform_with_hooks(*serialized_args)
+ self.arguments = Parameters.deserialize(serialized_args)
+
+ run_callbacks :perform do
+ perform *arguments
+ end
end
def perform(*)
raise NotImplementedError
end
-
- private
- def instrument_performing(args)
- ActiveSupport::Notifications.instrument "perform.active_job", adapter: self.class.queue_adapter, job: self.class, args: args
- end
end
end
diff --git a/lib/active_job/queue_adapters/backburner_adapter.rb b/lib/active_job/queue_adapters/backburner_adapter.rb
index 5230acc625..b7e963cd6f 100644
--- a/lib/active_job/queue_adapters/backburner_adapter.rb
+++ b/lib/active_job/queue_adapters/backburner_adapter.rb
@@ -16,7 +16,7 @@ module ActiveJob
class JobWrapper
class << self
def perform(job_name, *args)
- job_name.constantize.new.perform_with_deserialization *args
+ job_name.constantize.new.perform_with_hooks *args
end
end
end
diff --git a/lib/active_job/queue_adapters/delayed_job_adapter.rb b/lib/active_job/queue_adapters/delayed_job_adapter.rb
index 5a9c4c708d..fa08d779fb 100644
--- a/lib/active_job/queue_adapters/delayed_job_adapter.rb
+++ b/lib/active_job/queue_adapters/delayed_job_adapter.rb
@@ -15,7 +15,7 @@ module ActiveJob
class JobWrapper
def perform(job, *args)
- job.new.perform_with_deserialization *args
+ job.new.perform_with_hooks *args
end
end
end
diff --git a/lib/active_job/queue_adapters/inline_adapter.rb b/lib/active_job/queue_adapters/inline_adapter.rb
index d826ce51b4..8b82d7c25a 100644
--- a/lib/active_job/queue_adapters/inline_adapter.rb
+++ b/lib/active_job/queue_adapters/inline_adapter.rb
@@ -3,7 +3,7 @@ module ActiveJob
class InlineAdapter
class << self
def enqueue(job, *args)
- job.new.perform_with_deserialization *args
+ job.new.perform_with_hooks *args
end
def enqueue_at(job, timestamp, *args)
@@ -11,7 +11,7 @@ module ActiveJob
begin
interval = Time.now.to_f - timestamp
sleep(interval) if interval > 0
- job.new.perform_with_deserialization *args
+ job.new.perform_with_hooks *args
rescue => e
ActiveJob::Base.logger.info "Error performing #{job}: #{e.message}"
end
diff --git a/lib/active_job/queue_adapters/que_adapter.rb b/lib/active_job/queue_adapters/que_adapter.rb
index 9dd57d65f3..adb9125666 100644
--- a/lib/active_job/queue_adapters/que_adapter.rb
+++ b/lib/active_job/queue_adapters/que_adapter.rb
@@ -15,7 +15,7 @@ module ActiveJob
class JobWrapper < Que::Job
def run(job, *args)
- job.new.perform_with_deserialization *args
+ job.new.perform_with_hooks *args
end
end
end
diff --git a/lib/active_job/queue_adapters/queue_classic_adapter.rb b/lib/active_job/queue_adapters/queue_classic_adapter.rb
index eacc6b5548..01d6d30caf 100644
--- a/lib/active_job/queue_adapters/queue_classic_adapter.rb
+++ b/lib/active_job/queue_adapters/queue_classic_adapter.rb
@@ -15,7 +15,7 @@ module ActiveJob
class JobWrapper
def self.perform(job, *args)
- job.new.perform_with_deserialization *args
+ job.new.perform_with_hooks *args
end
end
end
diff --git a/lib/active_job/queue_adapters/resque_adapter.rb b/lib/active_job/queue_adapters/resque_adapter.rb
index 3b87f25b80..99da3c63ce 100644
--- a/lib/active_job/queue_adapters/resque_adapter.rb
+++ b/lib/active_job/queue_adapters/resque_adapter.rb
@@ -19,7 +19,7 @@ module ActiveJob
class JobWrapper
class << self
def perform(job_name, *args)
- job_name.constantize.new.perform_with_deserialization *args
+ job_name.constantize.new.perform_with_hooks *args
end
end
diff --git a/lib/active_job/queue_adapters/sidekiq_adapter.rb b/lib/active_job/queue_adapters/sidekiq_adapter.rb
index 74fbe632d6..2a2c2ce442 100644
--- a/lib/active_job/queue_adapters/sidekiq_adapter.rb
+++ b/lib/active_job/queue_adapters/sidekiq_adapter.rb
@@ -26,7 +26,7 @@ module ActiveJob
include Sidekiq::Worker
def perform(job_name, *args)
- job_name.constantize.new.perform_with_deserialization *args
+ job_name.constantize.new.perform_with_hooks *args
end
end
end
diff --git a/lib/active_job/queue_adapters/sneakers_adapter.rb b/lib/active_job/queue_adapters/sneakers_adapter.rb
index 6bb575e907..ebd794fca1 100644
--- a/lib/active_job/queue_adapters/sneakers_adapter.rb
+++ b/lib/active_job/queue_adapters/sneakers_adapter.rb
@@ -23,7 +23,7 @@ module ActiveJob
include Sneakers::Worker
def work(job, *args)
- job.new.perform_with_deserialization *args
+ job.new.perform_with_hooks *args
end
end
end
diff --git a/lib/active_job/queue_adapters/sucker_punch_adapter.rb b/lib/active_job/queue_adapters/sucker_punch_adapter.rb
index 30718fc05f..a166081f9f 100644
--- a/lib/active_job/queue_adapters/sucker_punch_adapter.rb
+++ b/lib/active_job/queue_adapters/sucker_punch_adapter.rb
@@ -17,7 +17,7 @@ module ActiveJob
include SuckerPunch::Job
def perform(job, *args)
- job.new.perform_with_deserialization *args
+ job.new.perform_with_hooks *args
end
end
end
diff --git a/test/cases/callbacks_test.rb b/test/cases/callbacks_test.rb
new file mode 100644
index 0000000000..391c7e87c4
--- /dev/null
+++ b/test/cases/callbacks_test.rb
@@ -0,0 +1,23 @@
+require 'helper'
+require 'active_job/parameters'
+require 'jobs/callback_job'
+
+require 'active_support/core_ext/object/inclusion'
+
+class CallbacksTest < ActiveSupport::TestCase
+ test 'perform callbacks' do
+ performed_callback_job = CallbackJob.new.tap { |j| j.perform_with_hooks }
+ assert "CallbackJob ran before_perform".in? performed_callback_job.history
+ assert "CallbackJob ran after_perform".in? performed_callback_job.history
+ assert "CallbackJob ran around_perform_start".in? performed_callback_job.history
+ assert "CallbackJob ran around_perform_stop".in? performed_callback_job.history
+ end
+
+ test 'enqueue callbacks' do
+ enqueued_callback_job = CallbackJob.enqueue
+ assert "CallbackJob ran before_enqueue".in? enqueued_callback_job.history
+ assert "CallbackJob ran after_enqueue".in? enqueued_callback_job.history
+ assert "CallbackJob ran around_enqueue_start".in? enqueued_callback_job.history
+ assert "CallbackJob ran around_enqueue_stop".in? enqueued_callback_job.history
+ end
+end
diff --git a/test/cases/queuing_test.rb b/test/cases/queuing_test.rb
index 23df35a8df..029f60f246 100644
--- a/test/cases/queuing_test.rb
+++ b/test/cases/queuing_test.rb
@@ -26,4 +26,19 @@ class QueuingTest < ActiveSupport::TestCase
skip
end
end
+
+ test 'job returned by enqueue has the arguments available' do
+ job = HelloJob.enqueue "Jamie"
+ assert_equal [ "Jamie" ], job.arguments
+ end
+
+
+ test 'job returned by enqueue_at has the timestamp available' do
+ begin
+ job = HelloJob.enqueue_at Time.utc(2014, 1, 1)
+ assert_equal Time.utc(2014, 1, 1), job.enqueued_at
+ rescue NotImplementedError
+ skip
+ end
+ end
end
diff --git a/test/jobs/callback_job.rb b/test/jobs/callback_job.rb
new file mode 100644
index 0000000000..056dd073e8
--- /dev/null
+++ b/test/jobs/callback_job.rb
@@ -0,0 +1,32 @@
+class CallbackJob < ActiveJob::Base
+ before_perform ->(job) { job.history << "CallbackJob ran before_perform" }
+ after_perform ->(job) { job.history << "CallbackJob ran after_perform" }
+
+ before_enqueue ->(job) { job.history << "CallbackJob ran before_enqueue" }
+ after_enqueue ->(job) { job.history << "CallbackJob ran after_enqueue" }
+
+ around_perform :around_perform
+ around_enqueue :around_enqueue
+
+
+ def perform(person = "david")
+ # NOTHING!
+ end
+
+ def history
+ @history ||= []
+ end
+
+ # FIXME: Not sure why these can't be declared inline like before/after
+ def around_perform
+ history << "CallbackJob ran around_perform_start"
+ yield
+ history << "CallbackJob ran around_perform_stop"
+ end
+
+ def around_enqueue
+ history << "CallbackJob ran around_enqueue_start"
+ yield
+ history << "CallbackJob ran around_enqueue_stop"
+ end
+end