aboutsummaryrefslogtreecommitdiffstats
path: root/activejob/lib/active_job
diff options
context:
space:
mode:
Diffstat (limited to 'activejob/lib/active_job')
-rw-r--r--activejob/lib/active_job/arguments.rb56
-rw-r--r--activejob/lib/active_job/core.rb5
-rw-r--r--activejob/lib/active_job/enqueuing.rb1
-rw-r--r--activejob/lib/active_job/logging.rb2
-rw-r--r--activejob/lib/active_job/queue_adapter.rb58
-rw-r--r--activejob/lib/active_job/queue_adapters.rb84
-rw-r--r--activejob/lib/active_job/queue_adapters/backburner_adapter.rb14
-rw-r--r--activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb16
-rw-r--r--activejob/lib/active_job/queue_adapters/inline_adapter.rb12
-rw-r--r--activejob/lib/active_job/queue_adapters/qu_adapter.rb20
-rw-r--r--activejob/lib/active_job/queue_adapters/que_adapter.rb16
-rw-r--r--activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb38
-rw-r--r--activejob/lib/active_job/queue_adapters/resque_adapter.rb18
-rw-r--r--activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb30
-rw-r--r--activejob/lib/active_job/queue_adapters/sneakers_adapter.rb20
-rw-r--r--activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb12
-rw-r--r--activejob/lib/active_job/queue_adapters/test_adapter.rb41
-rw-r--r--activejob/lib/active_job/test_helper.rb114
18 files changed, 391 insertions, 166 deletions
diff --git a/activejob/lib/active_job/arguments.rb b/activejob/lib/active_job/arguments.rb
index 752be6898e..cdb37879b6 100644
--- a/activejob/lib/active_job/arguments.rb
+++ b/activejob/lib/active_job/arguments.rb
@@ -1,10 +1,12 @@
-require 'active_support/core_ext/hash/indifferent_access'
+require 'active_support/core_ext/hash'
module ActiveJob
# Raised when an exception is raised during job arguments deserialization.
#
# Wraps the original exception raised as +original_exception+.
class DeserializationError < StandardError
+ # The original exception that was raised during deserialization of job
+ # arguments.
attr_reader :original_exception
def initialize(e) #:nodoc:
@@ -24,6 +26,7 @@ module ActiveJob
module Arguments
extend self
+ # :nodoc:
TYPE_WHITELIST = [ NilClass, Fixnum, Float, String, TrueClass, FalseClass, Bignum ]
# Serializes a set of arguments. Whitelisted types are returned
@@ -43,8 +46,13 @@ module ActiveJob
end
private
+ # :nodoc:
GLOBALID_KEY = '_aj_globalid'.freeze
- private_constant :GLOBALID_KEY
+ # :nodoc:
+ SYMBOL_KEYS_KEY = '_aj_symbol_keys'.freeze
+ # :nodoc:
+ WITH_INDIFFERENT_ACCESS_KEY = '_aj_hash_with_indifferent_access'.freeze
+ private_constant :GLOBALID_KEY, :SYMBOL_KEYS_KEY, :WITH_INDIFFERENT_ACCESS_KEY
def serialize_argument(argument)
case argument
@@ -54,10 +62,15 @@ module ActiveJob
{ GLOBALID_KEY => argument.to_global_id.to_s }
when Array
argument.map { |arg| serialize_argument(arg) }
+ when ActiveSupport::HashWithIndifferentAccess
+ result = serialize_hash(argument)
+ result[WITH_INDIFFERENT_ACCESS_KEY] = serialize_argument(true)
+ result
when Hash
- argument.each_with_object({}) do |(key, value), hash|
- hash[serialize_hash_key(key)] = serialize_argument(value)
- end
+ symbol_keys = argument.each_key.grep(Symbol).map(&:to_s)
+ result = serialize_hash(argument)
+ result[SYMBOL_KEYS_KEY] = symbol_keys
+ result
else
raise SerializationError.new("Unsupported argument type: #{argument.class.name}")
end
@@ -75,7 +88,7 @@ module ActiveJob
if serialized_global_id?(argument)
deserialize_global_id argument
else
- deserialize_hash argument
+ deserialize_hash(argument)
end
else
raise ArgumentError, "Can only deserialize primitive arguments: #{argument.inspect}"
@@ -90,13 +103,28 @@ module ActiveJob
GlobalID::Locator.locate hash[GLOBALID_KEY]
end
+ def serialize_hash(argument)
+ argument.each_with_object({}) do |(key, value), hash|
+ hash[serialize_hash_key(key)] = serialize_argument(value)
+ end
+ end
+
def deserialize_hash(serialized_hash)
- serialized_hash.each_with_object({}.with_indifferent_access) do |(key, value), hash|
- hash[key] = deserialize_argument(value)
+ result = serialized_hash.transform_values { |v| deserialize_argument(v) }
+ if result.delete(WITH_INDIFFERENT_ACCESS_KEY)
+ result = result.with_indifferent_access
+ elsif symbol_keys = result.delete(SYMBOL_KEYS_KEY)
+ result = transform_symbol_keys(result, symbol_keys)
end
+ result
end
- RESERVED_KEYS = [GLOBALID_KEY, GLOBALID_KEY.to_sym]
+ # :nodoc:
+ RESERVED_KEYS = [
+ GLOBALID_KEY, GLOBALID_KEY.to_sym,
+ SYMBOL_KEYS_KEY, SYMBOL_KEYS_KEY.to_sym,
+ WITH_INDIFFERENT_ACCESS_KEY, WITH_INDIFFERENT_ACCESS_KEY.to_sym,
+ ]
private_constant :RESERVED_KEYS
def serialize_hash_key(key)
@@ -109,5 +137,15 @@ module ActiveJob
raise SerializationError.new("Only string and symbol hash keys may be serialized as job arguments, but #{key.inspect} is a #{key.class}")
end
end
+
+ def transform_symbol_keys(hash, symbol_keys)
+ hash.transform_keys do |key|
+ if symbol_keys.include?(key)
+ key.to_sym
+ else
+ key
+ end
+ end
+ end
end
end
diff --git a/activejob/lib/active_job/core.rb b/activejob/lib/active_job/core.rb
index ddd7d1361c..0528572cd0 100644
--- a/activejob/lib/active_job/core.rb
+++ b/activejob/lib/active_job/core.rb
@@ -1,4 +1,6 @@
module ActiveJob
+ # Provides general behavior that will be included into every Active Job
+ # object that inherits from ActiveJob::Base.
module Core
extend ActiveSupport::Concern
@@ -15,6 +17,9 @@ module ActiveJob
# Queue in which the job will reside.
attr_writer :queue_name
+
+ # ID optionally provided by adapter
+ attr_accessor :provider_job_id
end
# These methods will be included into any Active Job object, adding
diff --git a/activejob/lib/active_job/enqueuing.rb b/activejob/lib/active_job/enqueuing.rb
index 430c17e1bf..98d92385dd 100644
--- a/activejob/lib/active_job/enqueuing.rb
+++ b/activejob/lib/active_job/enqueuing.rb
@@ -1,6 +1,7 @@
require 'active_job/arguments'
module ActiveJob
+ # Provides behavior for enqueuing and retrying jobs.
module Enqueuing
extend ActiveSupport::Concern
diff --git a/activejob/lib/active_job/logging.rb b/activejob/lib/active_job/logging.rb
index cd29e6908e..54774db601 100644
--- a/activejob/lib/active_job/logging.rb
+++ b/activejob/lib/active_job/logging.rb
@@ -81,7 +81,7 @@ module ActiveJob
private
def queue_name(event)
- event.payload[:adapter].name.demodulize.remove('Adapter') + "(#{event.payload[:job].queue_name})"
+ event.payload[:adapter].class.name.demodulize.remove('Adapter') + "(#{event.payload[:job].queue_name})"
end
def args_info(job)
diff --git a/activejob/lib/active_job/queue_adapter.rb b/activejob/lib/active_job/queue_adapter.rb
index d610d30e01..457015b741 100644
--- a/activejob/lib/active_job/queue_adapter.rb
+++ b/activejob/lib/active_job/queue_adapter.rb
@@ -1,35 +1,63 @@
require 'active_job/queue_adapters/inline_adapter'
+require 'active_support/core_ext/class/attribute'
require 'active_support/core_ext/string/inflections'
module ActiveJob
# The <tt>ActiveJob::QueueAdapter</tt> module is used to load the
- # correct adapter. The default queue adapter is the :inline queue.
+ # correct adapter. The default queue adapter is the +:inline+ queue.
module QueueAdapter #:nodoc:
extend ActiveSupport::Concern
+ included do
+ class_attribute :_queue_adapter, instance_accessor: false, instance_predicate: false
+ self.queue_adapter = :inline
+ end
+
# Includes the setter method for changing the active queue adapter.
module ClassMethods
- mattr_reader(:queue_adapter) { ActiveJob::QueueAdapters::InlineAdapter }
+ # Returns the backend queue provider. The default queue adapter
+ # is the +:inline+ queue. See QueueAdapters for more information.
+ def queue_adapter
+ _queue_adapter
+ end
# Specify the backend queue provider. The default queue adapter
- # is the :inline queue. See QueueAdapters for more
+ # is the +:inline+ queue. See QueueAdapters for more
# information.
- def queue_adapter=(name_or_adapter)
- @@queue_adapter = \
- case name_or_adapter
- when :test
- ActiveJob::QueueAdapters::TestAdapter.new
- when Symbol, String
- load_adapter(name_or_adapter)
- else
- name_or_adapter if name_or_adapter.respond_to?(:enqueue)
- end
+ def queue_adapter=(name_or_adapter_or_class)
+ self._queue_adapter = interpret_adapter(name_or_adapter_or_class)
end
private
- def load_adapter(name)
- "ActiveJob::QueueAdapters::#{name.to_s.camelize}Adapter".constantize
+
+ def interpret_adapter(name_or_adapter_or_class)
+ case name_or_adapter_or_class
+ when Symbol, String
+ ActiveJob::QueueAdapters.lookup(name_or_adapter_or_class).new
+ else
+ if queue_adapter?(name_or_adapter_or_class)
+ name_or_adapter_or_class
+ elsif queue_adapter_class?(name_or_adapter_or_class)
+ ActiveSupport::Deprecation.warn "Passing an adapter class is deprecated " \
+ "and will be removed in Rails 5.1. Please pass an adapter name " \
+ "(.queue_adapter = :#{name_or_adapter_or_class.name.demodulize.remove('Adapter').underscore}) " \
+ "or an instance (.queue_adapter = #{name_or_adapter_or_class.name}.new) instead."
+ name_or_adapter_or_class.new
+ else
+ raise ArgumentError
+ end
end
+ end
+
+ QUEUE_ADAPTER_METHODS = [:enqueue, :enqueue_at].freeze
+
+ def queue_adapter?(object)
+ QUEUE_ADAPTER_METHODS.all? { |meth| object.respond_to?(meth) }
+ end
+
+ def queue_adapter_class?(object)
+ object.is_a?(Class) && QUEUE_ADAPTER_METHODS.all? { |meth| object.public_method_defined?(meth) }
+ end
end
end
end
diff --git a/activejob/lib/active_job/queue_adapters.rb b/activejob/lib/active_job/queue_adapters.rb
index 4b91c93dbe..1335e3236e 100644
--- a/activejob/lib/active_job/queue_adapters.rb
+++ b/activejob/lib/active_job/queue_adapters.rb
@@ -27,13 +27,76 @@ module ActiveJob
# | Sneakers | Yes | Yes | No | Queue | Queue | No |
# | Sucker Punch | Yes | Yes | No | No | No | No |
# | Active Job Inline | No | Yes | N/A | N/A | N/A | N/A |
- # | Active Job | Yes | Yes | Yes | No | No | No |
+ #
+ # ==== Async
+ #
+ # Yes: The Queue Adapter runs the jobs in a separate or forked process.
+ #
+ # No: The job is run in the same process.
+ #
+ # ==== Queues
+ #
+ # Yes: Jobs may set which queue they are run in with queue_as or by using the set
+ # method.
+ #
+ # ==== Delayed
+ #
+ # Yes: The adapter will run the job in the future through perform_later.
+ #
+ # (Gem): An additional gem is required to use perform_later with this adapter.
+ #
+ # No: The adapter will run jobs at the next opportunity and cannot use perform_later.
+ #
+ # N/A: The adapter does not support queueing.
#
# NOTE:
- # queue_classic does not support Job scheduling. However you can implement this
- # yourself or you can use the queue_classic-later gem. See the documentation for
- # ActiveJob::QueueAdapters::QueueClassicAdapter.
+ # queue_classic does not support job scheduling.
+ # However, you can use the queue_classic-later gem.
+ # See the documentation for ActiveJob::QueueAdapters::QueueClassicAdapter.
+ #
+ # ==== Priorities
+ #
+ # The order in which jobs are processed can be configured differently depending
+ # on the adapter.
+ #
+ # Job: Any class inheriting from the adapter may set the priority on the job
+ # object relative to other jobs.
+ #
+ # Queue: The adapter can set the priority for job queues, when setting a queue
+ # with Active Job this will be respected.
+ #
+ # Yes: Allows the priority to be set on the job object, at the queue level or
+ # as default configuration option.
#
+ # No: Does not allow the priority of jobs to be configured.
+ #
+ # N/A: The adapter does not support queueing, and therefore sorting them.
+ #
+ # ==== Timeout
+ #
+ # When a job will stop after the allotted time.
+ #
+ # Job: The timeout can be set for each instance of the job class.
+ #
+ # Queue: The timeout is set for all jobs on the queue.
+ #
+ # Global: The adapter is configured that all jobs have a maximum run time.
+ #
+ # N/A: This adapter does not run in a separate process, and therefore timeout
+ # is unsupported.
+ #
+ # ==== Retries
+ #
+ # Job: The number of retries can be set per instance of the job class.
+ #
+ # Yes: The Number of retries can be configured globally, for each instance or
+ # on the queue. This adapter may also present failed instances of the job class
+ # that can be restarted.
+ #
+ # Global: The adapter has a global number of retries.
+ #
+ # N/A: The adapter does not run in a separate process, and therefore doesn't
+ # support retries.
module QueueAdapters
extend ActiveSupport::Autoload
@@ -48,5 +111,18 @@ module ActiveJob
autoload :SneakersAdapter
autoload :SuckerPunchAdapter
autoload :TestAdapter
+
+ ADAPTER = 'Adapter'.freeze
+ private_constant :ADAPTER
+
+ class << self
+ # Returns adapter for specified name.
+ #
+ # ActiveJob::QueueAdapters.lookup(:sidekiq)
+ # # => ActiveJob::QueueAdapters::SidekiqAdapter
+ def lookup(name)
+ const_get(name.to_s.camelize << ADAPTER)
+ end
+ end
end
end
diff --git a/activejob/lib/active_job/queue_adapters/backburner_adapter.rb b/activejob/lib/active_job/queue_adapters/backburner_adapter.rb
index 2453d065de..17703e3e41 100644
--- a/activejob/lib/active_job/queue_adapters/backburner_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/backburner_adapter.rb
@@ -13,15 +13,13 @@ module ActiveJob
#
# Rails.application.config.active_job.queue_adapter = :backburner
class BackburnerAdapter
- class << self
- def enqueue(job) #:nodoc:
- Backburner::Worker.enqueue JobWrapper, [ job.serialize ], queue: job.queue_name
- end
+ def enqueue(job) #:nodoc:
+ Backburner::Worker.enqueue JobWrapper, [ job.serialize ], queue: job.queue_name
+ end
- def enqueue_at(job, timestamp) #:nodoc:
- delay = timestamp - Time.current.to_f
- Backburner::Worker.enqueue JobWrapper, [ job.serialize ], queue: job.queue_name, delay: delay
- end
+ def enqueue_at(job, timestamp) #:nodoc:
+ delay = timestamp - Time.current.to_f
+ Backburner::Worker.enqueue JobWrapper, [ job.serialize ], queue: job.queue_name, delay: delay
end
class JobWrapper #:nodoc:
diff --git a/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb b/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb
index 69d9e70de3..ac83da2b9c 100644
--- a/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb
@@ -13,14 +13,16 @@ module ActiveJob
#
# Rails.application.config.active_job.queue_adapter = :delayed_job
class DelayedJobAdapter
- class << self
- def enqueue(job) #:nodoc:
- Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name)
- end
+ def enqueue(job) #:nodoc:
+ delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name)
+ job.provider_job_id = delayed_job.id
+ delayed_job
+ end
- def enqueue_at(job, timestamp) #:nodoc:
- Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, run_at: Time.at(timestamp))
- end
+ def enqueue_at(job, timestamp) #:nodoc:
+ delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, run_at: Time.at(timestamp))
+ job.provider_job_id = delayed_job.id
+ delayed_job
end
class JobWrapper #:nodoc:
diff --git a/activejob/lib/active_job/queue_adapters/inline_adapter.rb b/activejob/lib/active_job/queue_adapters/inline_adapter.rb
index e25d88e723..8ad5f4de07 100644
--- a/activejob/lib/active_job/queue_adapters/inline_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/inline_adapter.rb
@@ -9,14 +9,12 @@ module ActiveJob
#
# Rails.application.config.active_job.queue_adapter = :inline
class InlineAdapter
- class << self
- def enqueue(job) #:nodoc:
- Base.execute(job.serialize)
- end
+ def enqueue(job) #:nodoc:
+ Base.execute(job.serialize)
+ end
- def enqueue_at(*) #:nodoc:
- raise NotImplementedError.new("Use a queueing backend to enqueue jobs in the future. Read more at http://guides.rubyonrails.org/active_job_basics.html")
- end
+ def enqueue_at(*) #:nodoc:
+ raise NotImplementedError, "Use a queueing backend to enqueue jobs in the future. Read more at http://guides.rubyonrails.org/active_job_basics.html"
end
end
end
diff --git a/activejob/lib/active_job/queue_adapters/qu_adapter.rb b/activejob/lib/active_job/queue_adapters/qu_adapter.rb
index 30aa5a4670..0e198922fc 100644
--- a/activejob/lib/active_job/queue_adapters/qu_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/qu_adapter.rb
@@ -16,16 +16,18 @@ module ActiveJob
#
# Rails.application.config.active_job.queue_adapter = :qu
class QuAdapter
- class << self
- def enqueue(job, *args) #:nodoc:
- Qu::Payload.new(klass: JobWrapper, args: [job.serialize]).tap do |payload|
- payload.instance_variable_set(:@queue, job.queue_name)
- end.push
- end
+ def enqueue(job, *args) #:nodoc:
+ qu_job = Qu::Payload.new(klass: JobWrapper, args: [job.serialize]).tap do |payload|
+ payload.instance_variable_set(:@queue, job.queue_name)
+ end.push
+
+ # qu_job can be nil depending on the configured backend
+ job.provider_job_id = qu_job.id unless qu_job.nil?
+ qu_job
+ end
- def enqueue_at(job, timestamp, *args) #:nodoc:
- raise NotImplementedError
- end
+ def enqueue_at(job, timestamp, *args) #:nodoc:
+ raise NotImplementedError, "This queueing backend does not support scheduling jobs. To see what features are supported go to http://api.rubyonrails.org/classes/ActiveJob/QueueAdapters.html"
end
class JobWrapper < Qu::Job #:nodoc:
diff --git a/activejob/lib/active_job/queue_adapters/que_adapter.rb b/activejob/lib/active_job/queue_adapters/que_adapter.rb
index e501fe0368..90947aa98d 100644
--- a/activejob/lib/active_job/queue_adapters/que_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/que_adapter.rb
@@ -15,14 +15,16 @@ module ActiveJob
#
# Rails.application.config.active_job.queue_adapter = :que
class QueAdapter
- class << self
- def enqueue(job) #:nodoc:
- JobWrapper.enqueue job.serialize, queue: job.queue_name
- end
+ def enqueue(job) #:nodoc:
+ que_job = JobWrapper.enqueue job.serialize
+ job.provider_job_id = que_job.attrs["job_id"]
+ que_job
+ end
- def enqueue_at(job, timestamp) #:nodoc:
- JobWrapper.enqueue job.serialize, queue: job.queue_name, run_at: Time.at(timestamp)
- end
+ def enqueue_at(job, timestamp) #:nodoc:
+ que_job = JobWrapper.enqueue job.serialize, run_at: Time.at(timestamp)
+ job.provider_job_id = que_job.attrs["job_id"]
+ que_job
end
class JobWrapper < Que::Job #:nodoc:
diff --git a/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb b/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb
index 34c11a68b2..059754a87f 100644
--- a/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb
@@ -17,29 +17,27 @@ module ActiveJob
#
# Rails.application.config.active_job.queue_adapter = :queue_classic
class QueueClassicAdapter
- class << self
- def enqueue(job) #:nodoc:
- build_queue(job.queue_name).enqueue("#{JobWrapper.name}.perform", job.serialize)
- end
+ def enqueue(job) #:nodoc:
+ build_queue(job.queue_name).enqueue("#{JobWrapper.name}.perform", job.serialize)
+ end
- def enqueue_at(job, timestamp) #:nodoc:
- queue = build_queue(job.queue_name)
- unless queue.respond_to?(:enqueue_at)
- raise NotImplementedError, 'To be able to schedule jobs with queue_classic ' \
- 'the QC::Queue needs to respond to `enqueue_at(timestamp, method, *args)`. ' \
- 'You can implement this yourself or you can use the queue_classic-later gem.'
- end
- queue.enqueue_at(timestamp, "#{JobWrapper.name}.perform", job.serialize)
+ def enqueue_at(job, timestamp) #:nodoc:
+ queue = build_queue(job.queue_name)
+ unless queue.respond_to?(:enqueue_at)
+ raise NotImplementedError, 'To be able to schedule jobs with queue_classic ' \
+ 'the QC::Queue needs to respond to `enqueue_at(timestamp, method, *args)`. ' \
+ 'You can implement this yourself or you can use the queue_classic-later gem.'
end
+ queue.enqueue_at(timestamp, "#{JobWrapper.name}.perform", job.serialize)
+ end
- # Builds a <tt>QC::Queue</tt> object to schedule jobs on.
- #
- # If you have a custom <tt>QC::Queue</tt> subclass you'll need to subclass
- # <tt>ActiveJob::QueueAdapters::QueueClassicAdapter</tt> and override the
- # <tt>build_queue</tt> method.
- def build_queue(queue_name)
- QC::Queue.new(queue_name)
- end
+ # Builds a <tt>QC::Queue</tt> object to schedule jobs on.
+ #
+ # If you have a custom <tt>QC::Queue</tt> subclass you'll need to subclass
+ # <tt>ActiveJob::QueueAdapters::QueueClassicAdapter</tt> and override the
+ # <tt>build_queue</tt> method.
+ def build_queue(queue_name)
+ QC::Queue.new(queue_name)
end
class JobWrapper #:nodoc:
diff --git a/activejob/lib/active_job/queue_adapters/resque_adapter.rb b/activejob/lib/active_job/queue_adapters/resque_adapter.rb
index 88c6b48fef..417854afd8 100644
--- a/activejob/lib/active_job/queue_adapters/resque_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/resque_adapter.rb
@@ -26,18 +26,16 @@ module ActiveJob
#
# Rails.application.config.active_job.queue_adapter = :resque
class ResqueAdapter
- class << self
- def enqueue(job) #:nodoc:
- Resque.enqueue_to job.queue_name, JobWrapper, job.serialize
- end
+ def enqueue(job) #:nodoc:
+ Resque.enqueue_to job.queue_name, JobWrapper, job.serialize
+ end
- def enqueue_at(job, timestamp) #:nodoc:
- unless Resque.respond_to?(:enqueue_at_with_queue)
- raise NotImplementedError, "To be able to schedule jobs with Resque you need the " \
- "resque-scheduler gem. Please add it to your Gemfile and run bundle install"
- end
- Resque.enqueue_at_with_queue job.queue_name, timestamp, JobWrapper, job.serialize
+ def enqueue_at(job, timestamp) #:nodoc:
+ unless Resque.respond_to?(:enqueue_at_with_queue)
+ raise NotImplementedError, "To be able to schedule jobs with Resque you need the " \
+ "resque-scheduler gem. Please add it to your Gemfile and run bundle install"
end
+ Resque.enqueue_at_with_queue job.queue_name, timestamp, JobWrapper, job.serialize
end
class JobWrapper #:nodoc:
diff --git a/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb b/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb
index 21005fc728..c321776bf5 100644
--- a/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb
@@ -15,22 +15,22 @@ module ActiveJob
#
# Rails.application.config.active_job.queue_adapter = :sidekiq
class SidekiqAdapter
- class << self
- def enqueue(job) #:nodoc:
- #Sidekiq::Client does not support symbols as keys
- Sidekiq::Client.push \
- 'class' => JobWrapper,
- 'queue' => job.queue_name,
- 'args' => [ job.serialize ]
- end
+ def enqueue(job) #:nodoc:
+ #Sidekiq::Client does not support symbols as keys
+ job.provider_job_id = Sidekiq::Client.push \
+ 'class' => JobWrapper,
+ 'wrapped' => job.class.to_s,
+ 'queue' => job.queue_name,
+ 'args' => [ job.serialize ]
+ end
- def enqueue_at(job, timestamp) #:nodoc:
- Sidekiq::Client.push \
- 'class' => JobWrapper,
- 'queue' => job.queue_name,
- 'args' => [ job.serialize ],
- 'at' => timestamp
- end
+ def enqueue_at(job, timestamp) #:nodoc:
+ job.provider_job_id = Sidekiq::Client.push \
+ 'class' => JobWrapper,
+ 'wrapped' => job.class.to_s,
+ 'queue' => job.queue_name,
+ 'args' => [ job.serialize ],
+ 'at' => timestamp
end
class JobWrapper #:nodoc:
diff --git a/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb b/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb
index 6d60a2f303..f102c6567e 100644
--- a/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb
@@ -16,19 +16,19 @@ module ActiveJob
#
# Rails.application.config.active_job.queue_adapter = :sneakers
class SneakersAdapter
- @monitor = Monitor.new
+ def initialize
+ @monitor = Monitor.new
+ end
- class << self
- def enqueue(job) #:nodoc:
- @monitor.synchronize do
- JobWrapper.from_queue job.queue_name
- JobWrapper.enqueue ActiveSupport::JSON.encode(job.serialize)
- end
+ def enqueue(job) #:nodoc:
+ @monitor.synchronize do
+ JobWrapper.from_queue job.queue_name
+ JobWrapper.enqueue ActiveSupport::JSON.encode(job.serialize)
end
+ end
- def enqueue_at(job, timestamp) #:nodoc:
- raise NotImplementedError
- end
+ def enqueue_at(job, timestamp) #:nodoc:
+ raise NotImplementedError, "This queueing backend does not support scheduling jobs. To see what features are supported go to http://api.rubyonrails.org/classes/ActiveJob/QueueAdapters.html"
end
class JobWrapper #:nodoc:
diff --git a/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb b/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb
index be9e7fd03a..c6c35f8ab4 100644
--- a/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb
@@ -18,14 +18,12 @@ module ActiveJob
#
# Rails.application.config.active_job.queue_adapter = :sucker_punch
class SuckerPunchAdapter
- class << self
- def enqueue(job) #:nodoc:
- JobWrapper.new.async.perform job.serialize
- end
+ def enqueue(job) #:nodoc:
+ JobWrapper.new.async.perform job.serialize
+ end
- def enqueue_at(job, timestamp) #:nodoc:
- raise NotImplementedError
- end
+ def enqueue_at(job, timestamp) #:nodoc:
+ raise NotImplementedError, "This queueing backend does not support scheduling jobs. To see what features are supported go to http://api.rubyonrails.org/classes/ActiveJob/QueueAdapters.html"
end
class JobWrapper #:nodoc:
diff --git a/activejob/lib/active_job/queue_adapters/test_adapter.rb b/activejob/lib/active_job/queue_adapters/test_adapter.rb
index ea9df9a063..9b7b7139f4 100644
--- a/activejob/lib/active_job/queue_adapters/test_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/test_adapter.rb
@@ -10,15 +10,9 @@ module ActiveJob
#
# Rails.application.config.active_job.queue_adapter = :test
class TestAdapter
- delegate :name, to: :class
- attr_accessor(:perform_enqueued_jobs, :perform_enqueued_at_jobs)
+ attr_accessor(:perform_enqueued_jobs, :perform_enqueued_at_jobs, :filter)
attr_writer(:enqueued_jobs, :performed_jobs)
- def initialize
- self.perform_enqueued_jobs = false
- self.perform_enqueued_at_jobs = false
- end
-
# Provides a store of all the enqueued jobs with the TestAdapter so you can check them.
def enqueued_jobs
@enqueued_jobs ||= []
@@ -30,22 +24,37 @@ module ActiveJob
end
def enqueue(job) #:nodoc:
- if perform_enqueued_jobs
- performed_jobs << {job: job.class, args: job.serialize['arguments'], queue: job.queue_name}
- Base.execute job.serialize
- else
- enqueued_jobs << {job: job.class, args: job.serialize['arguments'], queue: job.queue_name}
- end
+ return if filtered?(job)
+
+ job_data = job_to_hash(job)
+ enqueue_or_perform(perform_enqueued_jobs, job, job_data)
end
def enqueue_at(job, timestamp) #:nodoc:
- if perform_enqueued_at_jobs
- performed_jobs << {job: job.class, args: job.serialize['arguments'], queue: job.queue_name, at: timestamp}
+ return if filtered?(job)
+
+ job_data = job_to_hash(job, at: timestamp)
+ enqueue_or_perform(perform_enqueued_at_jobs, job, job_data)
+ end
+
+ private
+
+ def job_to_hash(job, extras = {})
+ { job: job.class, args: job.serialize.fetch('arguments'), queue: job.queue_name }.merge!(extras)
+ end
+
+ def enqueue_or_perform(perform, job, job_data)
+ if perform
+ performed_jobs << job_data
Base.execute job.serialize
else
- enqueued_jobs << {job: job.class, args: job.serialize['arguments'], queue: job.queue_name, at: timestamp}
+ enqueued_jobs << job_data
end
end
+
+ def filtered?(job)
+ filter && !Array(filter).include?(job.class)
+ end
end
end
end
diff --git a/activejob/lib/active_job/test_helper.rb b/activejob/lib/active_job/test_helper.rb
index d18656e398..9b307e8dc8 100644
--- a/activejob/lib/active_job/test_helper.rb
+++ b/activejob/lib/active_job/test_helper.rb
@@ -1,3 +1,4 @@
+require 'active_support/core_ext/class/subclasses'
require 'active_support/core_ext/hash/keys'
module ActiveJob
@@ -7,8 +8,17 @@ module ActiveJob
included do
def before_setup
- @old_queue_adapter = queue_adapter
- ActiveJob::Base.queue_adapter = :test
+ test_adapter = ActiveJob::QueueAdapters::TestAdapter.new
+
+ @old_queue_adapters = (ActiveJob::Base.subclasses << ActiveJob::Base).select do |klass|
+ # only override explicitly set adapters, a quirk of `class_attribute`
+ klass.singleton_class.public_instance_methods(false).include?(:_queue_adapter)
+ end.map do |klass|
+ [klass, klass.queue_adapter].tap do
+ klass.queue_adapter = test_adapter
+ end
+ end
+
clear_enqueued_jobs
clear_performed_jobs
super
@@ -16,7 +26,9 @@ module ActiveJob
def after_teardown
super
- ActiveJob::Base.queue_adapter = @old_queue_adapter
+ @old_queue_adapters.each do |(klass, adapter)|
+ klass.queue_adapter = adapter
+ end
end
# Asserts that the number of enqueued jobs matches the given number.
@@ -56,7 +68,7 @@ module ActiveJob
original_count = enqueued_jobs_size(only: only)
yield
new_count = enqueued_jobs_size(only: only)
- assert_equal original_count + number, new_count, "#{number} jobs expected, but #{new_count - original_count} were enqueued"
+ assert_equal number, new_count - original_count, "#{number} jobs expected, but #{new_count - original_count} were enqueued"
else
actual_count = enqueued_jobs_size(only: only)
assert_equal number, actual_count, "#{number} jobs expected, but #{actual_count} were enqueued"
@@ -125,12 +137,34 @@ module ActiveJob
# HelloJob.perform_later('sean')
# end
# end
- def assert_performed_jobs(number)
+ #
+ # The block form supports filtering. If the :only option is specified,
+ # then only the listed job(s) will be performed.
+ #
+ # def test_hello_job
+ # assert_performed_jobs 1, only: HelloJob do
+ # HelloJob.perform_later('jeremy')
+ # LoggingJob.perform_later
+ # end
+ # end
+ #
+ # An array may also be specified, to support testing multiple jobs.
+ #
+ # def test_hello_and_logging_jobs
+ # assert_nothing_raised do
+ # assert_performed_jobs 2, only: [HelloJob, LoggingJob] do
+ # HelloJob.perform_later('jeremy')
+ # LoggingJob.perform_later('stewie')
+ # RescueJob.perform_later('david')
+ # end
+ # end
+ # end
+ def assert_performed_jobs(number, only: nil)
if block_given?
original_count = performed_jobs.size
- perform_enqueued_jobs { yield }
+ perform_enqueued_jobs(only: only) { yield }
new_count = performed_jobs.size
- assert_equal original_count + number, new_count,
+ assert_equal number, new_count - original_count,
"#{number} jobs expected, but #{new_count - original_count} were performed"
else
performed_jobs_size = performed_jobs.size
@@ -157,11 +191,33 @@ module ActiveJob
# end
# end
#
+ # The block form supports filtering. If the :only option is specified,
+ # then only the listed job(s) will be performed.
+ #
+ # def test_hello_job
+ # assert_performed_jobs 1, only: HelloJob do
+ # HelloJob.perform_later('jeremy')
+ # LoggingJob.perform_later
+ # end
+ # end
+ #
+ # An array may also be specified, to support testing multiple jobs.
+ #
+ # def test_hello_and_logging_jobs
+ # assert_nothing_raised do
+ # assert_performed_jobs 2, only: [HelloJob, LoggingJob] do
+ # HelloJob.perform_later('jeremy')
+ # LoggingJob.perform_later('stewie')
+ # RescueJob.perform_later('david')
+ # end
+ # end
+ # end
+ #
# Note: This assertion is simply a shortcut for:
#
# assert_performed_jobs 0, &block
- def assert_no_performed_jobs(&block)
- assert_performed_jobs 0, &block
+ def assert_no_performed_jobs(only: nil, &block)
+ assert_performed_jobs 0, only: only, &block
end
# Asserts that the job passed in the block has been enqueued with the given arguments.
@@ -175,9 +231,10 @@ module ActiveJob
original_enqueued_jobs = enqueued_jobs.dup
clear_enqueued_jobs
args.assert_valid_keys(:job, :args, :at, :queue)
+ serialized_args = serialize_args_for_assertion(args)
yield
matching_job = enqueued_jobs.any? do |job|
- args.all? { |key, value| value == job[key] }
+ serialized_args.all? { |key, value| value == job[key] }
end
assert matching_job, "No enqueued job found with #{args}"
ensure
@@ -195,24 +252,31 @@ module ActiveJob
original_performed_jobs = performed_jobs.dup
clear_performed_jobs
args.assert_valid_keys(:job, :args, :at, :queue)
+ serialized_args = serialize_args_for_assertion(args)
perform_enqueued_jobs { yield }
matching_job = performed_jobs.any? do |job|
- args.all? { |key, value| value == job[key] }
+ serialized_args.all? { |key, value| value == job[key] }
end
assert matching_job, "No performed job found with #{args}"
ensure
queue_adapter.performed_jobs = original_performed_jobs + performed_jobs
end
- def perform_enqueued_jobs
- @old_perform_enqueued_jobs = queue_adapter.perform_enqueued_jobs
- @old_perform_enqueued_at_jobs = queue_adapter.perform_enqueued_at_jobs
- queue_adapter.perform_enqueued_jobs = true
- queue_adapter.perform_enqueued_at_jobs = true
- yield
- ensure
- queue_adapter.perform_enqueued_jobs = @old_perform_enqueued_jobs
- queue_adapter.perform_enqueued_at_jobs = @old_perform_enqueued_at_jobs
+ def perform_enqueued_jobs(only: nil)
+ old_perform_enqueued_jobs = queue_adapter.perform_enqueued_jobs
+ old_perform_enqueued_at_jobs = queue_adapter.perform_enqueued_at_jobs
+ old_filter = queue_adapter.filter
+
+ begin
+ queue_adapter.perform_enqueued_jobs = true
+ queue_adapter.perform_enqueued_at_jobs = true
+ queue_adapter.filter = only
+ yield
+ ensure
+ queue_adapter.perform_enqueued_jobs = old_perform_enqueued_jobs
+ queue_adapter.perform_enqueued_at_jobs = old_perform_enqueued_at_jobs
+ queue_adapter.filter = old_filter
+ end
end
def queue_adapter
@@ -234,11 +298,19 @@ module ActiveJob
def enqueued_jobs_size(only: nil)
if only
- enqueued_jobs.select { |job| job[:job] == only }.size
+ enqueued_jobs.select { |job| job.fetch(:job) == only }.size
else
enqueued_jobs.size
end
end
+
+ def serialize_args_for_assertion(args)
+ serialized_args = args.dup
+ if job_args = serialized_args.delete(:args)
+ serialized_args[:args] = ActiveJob::Arguments.serialize(job_args)
+ end
+ serialized_args
+ end
end
end
end