aboutsummaryrefslogtreecommitdiffstats
path: root/activejob/lib/active_job
diff options
context:
space:
mode:
Diffstat (limited to 'activejob/lib/active_job')
-rw-r--r--activejob/lib/active_job/arguments.rb55
-rw-r--r--activejob/lib/active_job/base.rb47
-rw-r--r--activejob/lib/active_job/callbacks.rb6
-rw-r--r--activejob/lib/active_job/core.rb41
-rw-r--r--activejob/lib/active_job/enqueuing.rb6
-rw-r--r--activejob/lib/active_job/execution.rb1
-rw-r--r--activejob/lib/active_job/gem_version.rb6
-rw-r--r--activejob/lib/active_job/logging.rb11
-rw-r--r--activejob/lib/active_job/queue_adapter.rb14
-rw-r--r--activejob/lib/active_job/queue_adapters.rb9
-rw-r--r--activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb4
-rw-r--r--activejob/lib/active_job/queue_adapters/inline_adapter.rb2
-rw-r--r--activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb10
-rw-r--r--activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb4
-rw-r--r--activejob/lib/active_job/queue_adapters/test_adapter.rb42
-rw-r--r--activejob/lib/active_job/queue_name.rb17
-rw-r--r--activejob/lib/active_job/test_helper.rb153
17 files changed, 341 insertions, 87 deletions
diff --git a/activejob/lib/active_job/arguments.rb b/activejob/lib/active_job/arguments.rb
index 86b85ebdba..622c37098e 100644
--- a/activejob/lib/active_job/arguments.rb
+++ b/activejob/lib/active_job/arguments.rb
@@ -1,3 +1,5 @@
+require 'active_support/core_ext/hash'
+
module ActiveJob
# Raised when an exception is raised during job arguments deserialization.
#
@@ -24,10 +26,16 @@ module ActiveJob
extend self
TYPE_WHITELIST = [ NilClass, Fixnum, Float, String, TrueClass, FalseClass, Bignum ]
+ # Serializes a set of arguments. Whitelisted types are returned
+ # as-is. Arrays/Hashes are serialized element by element.
+ # All other types are serialized using GlobalID.
def serialize(arguments)
arguments.map { |argument| serialize_argument(argument) }
end
+ # Deserializes a set of arguments. Whitelisted types are returned
+ # as-is. Arrays/Hashes are deserialized element by element.
+ # All other types are deserialized using GlobalID.
def deserialize(arguments)
arguments.map { |argument| deserialize_argument(argument) }
rescue => e
@@ -36,7 +44,9 @@ module ActiveJob
private
GLOBALID_KEY = '_aj_globalid'.freeze
- private_constant :GLOBALID_KEY
+ SYMBOL_KEYS_KEY = '_aj_symbol_keys'.freeze
+ WITH_INDIFFERENT_ACCESS_KEY = '_aj_hash_with_indifferent_access'.freeze
+ private_constant :GLOBALID_KEY, :SYMBOL_KEYS_KEY, :WITH_INDIFFERENT_ACCESS_KEY
def serialize_argument(argument)
case argument
@@ -46,10 +56,15 @@ module ActiveJob
{ GLOBALID_KEY => argument.to_global_id.to_s }
when Array
argument.map { |arg| serialize_argument(arg) }
+ when ActiveSupport::HashWithIndifferentAccess
+ result = serialize_hash(argument)
+ result[WITH_INDIFFERENT_ACCESS_KEY] = serialize_argument(true)
+ result
when Hash
- argument.each_with_object({}) do |(key, value), hash|
- hash[serialize_hash_key(key)] = serialize_argument(value)
- end
+ symbol_keys = argument.each_key.grep(Symbol).map(&:to_s)
+ result = serialize_hash(argument)
+ result[SYMBOL_KEYS_KEY] = symbol_keys
+ result
else
raise SerializationError.new("Unsupported argument type: #{argument.class.name}")
end
@@ -67,7 +82,7 @@ module ActiveJob
if serialized_global_id?(argument)
deserialize_global_id argument
else
- deserialize_hash argument
+ deserialize_hash(argument)
end
else
raise ArgumentError, "Can only deserialize primitive arguments: #{argument.inspect}"
@@ -82,13 +97,27 @@ module ActiveJob
GlobalID::Locator.locate hash[GLOBALID_KEY]
end
+ def serialize_hash(argument)
+ argument.each_with_object({}) do |(key, value), hash|
+ hash[serialize_hash_key(key)] = serialize_argument(value)
+ end
+ end
+
def deserialize_hash(serialized_hash)
- serialized_hash.each_with_object({}.with_indifferent_access) do |(key, value), hash|
- hash[key] = deserialize_argument(value)
+ result = serialized_hash.transform_values { |v| deserialize_argument(v) }
+ if result.delete(WITH_INDIFFERENT_ACCESS_KEY)
+ result = result.with_indifferent_access
+ elsif symbol_keys = result.delete(SYMBOL_KEYS_KEY)
+ result = transform_symbol_keys(result, symbol_keys)
end
+ result
end
- RESERVED_KEYS = [GLOBALID_KEY, GLOBALID_KEY.to_sym]
+ RESERVED_KEYS = [
+ GLOBALID_KEY, GLOBALID_KEY.to_sym,
+ SYMBOL_KEYS_KEY, SYMBOL_KEYS_KEY.to_sym,
+ WITH_INDIFFERENT_ACCESS_KEY, WITH_INDIFFERENT_ACCESS_KEY.to_sym,
+ ]
private_constant :RESERVED_KEYS
def serialize_hash_key(key)
@@ -101,5 +130,15 @@ module ActiveJob
raise SerializationError.new("Only string and symbol hash keys may be serialized as job arguments, but #{key.inspect} is a #{key.class}")
end
end
+
+ def transform_symbol_keys(hash, symbol_keys)
+ hash.transform_keys do |key|
+ if symbol_keys.include?(key)
+ key.to_sym
+ else
+ key
+ end
+ end
+ end
end
end
diff --git a/activejob/lib/active_job/base.rb b/activejob/lib/active_job/base.rb
index a3bec1f827..fd49b3fda5 100644
--- a/activejob/lib/active_job/base.rb
+++ b/activejob/lib/active_job/base.rb
@@ -6,7 +6,52 @@ require 'active_job/execution'
require 'active_job/callbacks'
require 'active_job/logging'
-module ActiveJob
+module ActiveJob #:nodoc:
+ # = Active Job
+ #
+ # Active Job objects can be configured to work with different backend
+ # queuing frameworks. To specify a queue adapter to use:
+ #
+ # ActiveJob::Base.queue_adapter = :inline
+ #
+ # A list of supported adapters can be found in QueueAdapters.
+ #
+ # Active Job objects can be defined by creating a class that inherits
+ # from the ActiveJob::Base class. The only necessary method to
+ # implement is the "perform" method.
+ #
+ # To define an Active Job object:
+ #
+ # class ProcessPhotoJob < ActiveJob::Base
+ # def perform(photo)
+ # photo.watermark!('Rails')
+ # photo.rotate!(90.degrees)
+ # photo.resize_to_fit!(300, 300)
+ # photo.upload!
+ # end
+ # end
+ #
+ # Records that are passed in are serialized/deserialized using Global
+ # ID. More information can be found in Arguments.
+ #
+ # To enqueue a job to be performed as soon the queueing system is free:
+ #
+ # ProcessPhotoJob.perform_later(photo)
+ #
+ # To enqueue a job to be processed at some point in the future:
+ #
+ # ProcessPhotoJob.set(wait_until: Date.tomorrow.noon).perform_later(photo)
+ #
+ # More information can be found in ActiveJob::Core::ClassMethods#set
+ #
+ # A job can also be processed immediately without sending to the queue:
+ #
+ # ProcessPhotoJob.perform_now(photo)
+ #
+ # == Exceptions
+ #
+ # * DeserializationError - Error class for deserialization errors.
+ # * SerializationError - Error class for serialization errors.
class Base
include Core
include QueueAdapter
diff --git a/activejob/lib/active_job/callbacks.rb b/activejob/lib/active_job/callbacks.rb
index 29e2a878b4..2b6149e84e 100644
--- a/activejob/lib/active_job/callbacks.rb
+++ b/activejob/lib/active_job/callbacks.rb
@@ -3,8 +3,8 @@ require 'active_support/callbacks'
module ActiveJob
# = Active Job Callbacks
#
- # Active Job provides hooks during the lifecycle of a job. Callbacks allow you
- # to trigger logic during the lifecycle of a job. Available callbacks are:
+ # Active Job provides hooks during the life cycle of a job. Callbacks allow you
+ # to trigger logic during the life cycle of a job. Available callbacks are:
#
# * <tt>before_enqueue</tt>
# * <tt>around_enqueue</tt>
@@ -22,6 +22,8 @@ module ActiveJob
define_callbacks :enqueue
end
+ # These methods will be included into any Active Job object, adding
+ # callbacks for +perform+ and +enqueue+ methods.
module ClassMethods
# Defines a callback that will get called right before the
# job's perform method is executed.
diff --git a/activejob/lib/active_job/core.rb b/activejob/lib/active_job/core.rb
index f55db5a588..ddd7d1361c 100644
--- a/activejob/lib/active_job/core.rb
+++ b/activejob/lib/active_job/core.rb
@@ -17,13 +17,13 @@ module ActiveJob
attr_writer :queue_name
end
+ # These methods will be included into any Active Job object, adding
+ # helpers for de/serialization and creation of job instances.
module ClassMethods
# Creates a new job instance from a hash created with +serialize+
def deserialize(job_data)
- job = job_data['job_class'].constantize.new
- job.job_id = job_data['job_id']
- job.queue_name = job_data['queue_name']
- job.serialized_arguments = job_data['arguments']
+ job = job_data['job_class'].constantize.new
+ job.deserialize(job_data)
job
end
@@ -48,8 +48,8 @@ module ActiveJob
end
end
- # Creates a new job instance. Takes as arguments the arguments that
- # will be passed to the perform method.
+ # Creates a new job instance. Takes the arguments that will be
+ # passed to the perform method.
def initialize(*arguments)
@arguments = arguments
@job_id = SecureRandom.uuid
@@ -67,6 +67,32 @@ module ActiveJob
}
end
+ # Attaches the stored job data to the current instance. Receives a hash
+ # returned from +serialize+
+ #
+ # ==== Examples
+ #
+ # class DeliverWebhookJob < ActiveJob::Base
+ # def serialize
+ # super.merge('attempt_number' => (@attempt_number || 0) + 1)
+ # end
+ #
+ # def deserialize(job_data)
+ # super
+ # @attempt_number = job_data['attempt_number']
+ # end
+ #
+ # rescue_from(TimeoutError) do |exception|
+ # raise exception if @attempt_number > 5
+ # retry_job(wait: 10)
+ # end
+ # end
+ def deserialize(job_data)
+ self.job_id = job_data['job_id']
+ self.queue_name = job_data['queue_name']
+ self.serialized_arguments = job_data['arguments']
+ end
+
private
def deserialize_arguments_if_needed
if defined?(@serialized_arguments) && @serialized_arguments.present?
@@ -84,6 +110,3 @@ module ActiveJob
end
end
end
-
-
-
diff --git a/activejob/lib/active_job/enqueuing.rb b/activejob/lib/active_job/enqueuing.rb
index 74bcc1fa5d..430c17e1bf 100644
--- a/activejob/lib/active_job/enqueuing.rb
+++ b/activejob/lib/active_job/enqueuing.rb
@@ -4,13 +4,14 @@ module ActiveJob
module Enqueuing
extend ActiveSupport::Concern
+ # Includes the +perform_later+ method for job initialization.
module ClassMethods
# Push a job onto the queue. The arguments must be legal JSON types
# (string, int, float, nil, true, false, hash or array) or
# GlobalID::Identification instances. Arbitrary Ruby objects
# are not supported.
#
- # Returns an instance of the job class queued with args available in
+ # Returns an instance of the job class queued with arguments available in
# Job#arguments.
def perform_later(*args)
job_or_instantiate(*args).enqueue
@@ -22,7 +23,7 @@ module ActiveJob
end
end
- # Reschedule the job to be re-executed. This is useful in combination
+ # Reschedules the job to be re-executed. This is useful in combination
# with the +rescue_from+ option. When you rescue an exception from your job
# you can ask Active Job to retry performing your job.
#
@@ -37,6 +38,7 @@ module ActiveJob
# rescue_from(ErrorLoadingSite) do
# retry_job queue: :low_priority
# end
+ #
# def perform(*args)
# # raise ErrorLoadingSite if cannot scrape
# end
diff --git a/activejob/lib/active_job/execution.rb b/activejob/lib/active_job/execution.rb
index 7ff857206d..79d232da4a 100644
--- a/activejob/lib/active_job/execution.rb
+++ b/activejob/lib/active_job/execution.rb
@@ -6,6 +6,7 @@ module ActiveJob
extend ActiveSupport::Concern
include ActiveSupport::Rescuable
+ # Includes methods for executing and performing jobs instantly.
module ClassMethods
# Performs the job immediately.
#
diff --git a/activejob/lib/active_job/gem_version.rb b/activejob/lib/active_job/gem_version.rb
index f80e6563b8..27a5de93f4 100644
--- a/activejob/lib/active_job/gem_version.rb
+++ b/activejob/lib/active_job/gem_version.rb
@@ -5,10 +5,10 @@ module ActiveJob
end
module VERSION
- MAJOR = 4
- MINOR = 2
+ MAJOR = 5
+ MINOR = 0
TINY = 0
- PRE = "beta2"
+ PRE = "alpha"
STRING = [MAJOR, MINOR, TINY, PRE].compact.join(".")
end
diff --git a/activejob/lib/active_job/logging.rb b/activejob/lib/active_job/logging.rb
index bb96668cfb..cd29e6908e 100644
--- a/activejob/lib/active_job/logging.rb
+++ b/activejob/lib/active_job/logging.rb
@@ -3,7 +3,7 @@ require 'active_support/tagged_logging'
require 'active_support/logger'
module ActiveJob
- module Logging
+ module Logging #:nodoc:
extend ActiveSupport::Concern
included do
@@ -75,7 +75,7 @@ module ActiveJob
def perform(event)
info do
job = event.payload[:job]
- "Performed #{job.class.name} from #{queue_name(event)} in #{event.duration.round(2).to_s}ms"
+ "Performed #{job.class.name} from #{queue_name(event)} in #{event.duration.round(2)}ms"
end
end
@@ -85,7 +85,12 @@ module ActiveJob
end
def args_info(job)
- job.arguments.any? ? " with arguments: #{job.arguments.map(&:inspect).join(", ")}" : ""
+ if job.arguments.any?
+ ' with arguments: ' +
+ job.arguments.map { |arg| arg.try(:to_global_id).try(:to_s) || arg.inspect }.join(', ')
+ else
+ ''
+ end
end
def scheduled_at(event)
diff --git a/activejob/lib/active_job/queue_adapter.rb b/activejob/lib/active_job/queue_adapter.rb
index fb54aec75e..d610d30e01 100644
--- a/activejob/lib/active_job/queue_adapter.rb
+++ b/activejob/lib/active_job/queue_adapter.rb
@@ -2,12 +2,18 @@ require 'active_job/queue_adapters/inline_adapter'
require 'active_support/core_ext/string/inflections'
module ActiveJob
- module QueueAdapter
+ # The <tt>ActiveJob::QueueAdapter</tt> module is used to load the
+ # correct adapter. The default queue adapter is the :inline queue.
+ module QueueAdapter #:nodoc:
extend ActiveSupport::Concern
+ # Includes the setter method for changing the active queue adapter.
module ClassMethods
mattr_reader(:queue_adapter) { ActiveJob::QueueAdapters::InlineAdapter }
+ # Specify the backend queue provider. The default queue adapter
+ # is the :inline queue. See QueueAdapters for more
+ # information.
def queue_adapter=(name_or_adapter)
@@queue_adapter = \
case name_or_adapter
@@ -15,8 +21,8 @@ module ActiveJob
ActiveJob::QueueAdapters::TestAdapter.new
when Symbol, String
load_adapter(name_or_adapter)
- when Class
- name_or_adapter
+ else
+ name_or_adapter if name_or_adapter.respond_to?(:enqueue)
end
end
@@ -26,4 +32,4 @@ module ActiveJob
end
end
end
-end \ No newline at end of file
+end
diff --git a/activejob/lib/active_job/queue_adapters.rb b/activejob/lib/active_job/queue_adapters.rb
index e865c901ce..4b91c93dbe 100644
--- a/activejob/lib/active_job/queue_adapters.rb
+++ b/activejob/lib/active_job/queue_adapters.rb
@@ -7,20 +7,21 @@ module ActiveJob
# * {Delayed Job}[https://github.com/collectiveidea/delayed_job]
# * {Qu}[https://github.com/bkeepers/qu]
# * {Que}[https://github.com/chanks/que]
- # * {QueueClassic 2.x}[https://github.com/ryandotsmith/queue_classic/tree/v2.2.3]
+ # * {queue_classic}[https://github.com/QueueClassic/queue_classic]
# * {Resque 1.x}[https://github.com/resque/resque/tree/1-x-stable]
# * {Sidekiq}[http://sidekiq.org]
# * {Sneakers}[https://github.com/jondot/sneakers]
# * {Sucker Punch}[https://github.com/brandonhilkert/sucker_punch]
#
- # #### Backends Features
+ # === Backends Features
#
# | | Async | Queues | Delayed | Priorities | Timeout | Retries |
# |-------------------|-------|--------|-----------|------------|---------|---------|
# | Backburner | Yes | Yes | Yes | Yes | Job | Global |
# | Delayed Job | Yes | Yes | Yes | Job | Global | Global |
+ # | Qu | Yes | Yes | No | No | No | Global |
# | Que | Yes | Yes | Yes | Job | No | Job |
- # | Queue Classic | Yes | Yes | No* | No | No | No |
+ # | queue_classic | Yes | Yes | No* | No | No | No |
# | Resque | Yes | Yes | Yes (Gem) | Queue | Global | Yes |
# | Sidekiq | Yes | Yes | Yes | Queue | No | Job |
# | Sneakers | Yes | Yes | No | Queue | Queue | No |
@@ -29,7 +30,7 @@ module ActiveJob
# | Active Job | Yes | Yes | Yes | No | No | No |
#
# NOTE:
- # Queue Classic does not support Job scheduling. However you can implement this
+ # queue_classic does not support Job scheduling. However you can implement this
# yourself or you can use the queue_classic-later gem. See the documentation for
# ActiveJob::QueueAdapters::QueueClassicAdapter.
#
diff --git a/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb b/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb
index 4d27c4fff8..69d9e70de3 100644
--- a/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb
@@ -6,10 +6,10 @@ module ActiveJob
#
# Delayed::Job (or DJ) encapsulates the common pattern of asynchronously
# executing longer tasks in the background. Although DJ can have many
- # storage backends one of the most used is based on Active Record.
+ # storage backends, one of the most used is based on Active Record.
# Read more about Delayed Job {here}[https://github.com/collectiveidea/delayed_job].
#
- # To use Delayed Job set the queue_adapter config to +:delayed_job+.
+ # To use Delayed Job, set the queue_adapter config to +:delayed_job+.
#
# Rails.application.config.active_job.queue_adapter = :delayed_job
class DelayedJobAdapter
diff --git a/activejob/lib/active_job/queue_adapters/inline_adapter.rb b/activejob/lib/active_job/queue_adapters/inline_adapter.rb
index e498454909..e25d88e723 100644
--- a/activejob/lib/active_job/queue_adapters/inline_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/inline_adapter.rb
@@ -15,7 +15,7 @@ module ActiveJob
end
def enqueue_at(*) #:nodoc:
- raise NotImplementedError.new("Use a queueing backend to enqueue jobs in the future. Read more at https://github.com/rails/activejob")
+ raise NotImplementedError.new("Use a queueing backend to enqueue jobs in the future. Read more at http://guides.rubyonrails.org/active_job_basics.html")
end
end
end
diff --git a/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb b/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb
index f160932578..34c11a68b2 100644
--- a/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb
@@ -2,7 +2,7 @@ require 'queue_classic'
module ActiveJob
module QueueAdapters
- # == Queue Classic adapter for Active Job
+ # == queue_classic adapter for Active Job
#
# queue_classic provides a simple interface to a PostgreSQL-backed message
# queue. queue_classic specializes in concurrent locking and minimizing
@@ -11,9 +11,9 @@ module ActiveJob
# production environment and that adding another dependency (e.g. redis,
# beanstalkd, 0mq) is undesirable.
#
- # Read more about Queue Classic {here}[https://github.com/ryandotsmith/queue_classic].
+ # Read more about queue_classic {here}[https://github.com/QueueClassic/queue_classic].
#
- # To use Queue Classic set the queue_adapter config to +:queue_classic+.
+ # To use queue_classic set the queue_adapter config to +:queue_classic+.
#
# Rails.application.config.active_job.queue_adapter = :queue_classic
class QueueClassicAdapter
@@ -25,8 +25,8 @@ module ActiveJob
def enqueue_at(job, timestamp) #:nodoc:
queue = build_queue(job.queue_name)
unless queue.respond_to?(:enqueue_at)
- raise NotImplementedError, 'To be able to schedule jobs with Queue Classic ' \
- 'the QC::Queue needs to respond to `enqueue_at(timestamp, method, *args)`. '
+ raise NotImplementedError, 'To be able to schedule jobs with queue_classic ' \
+ 'the QC::Queue needs to respond to `enqueue_at(timestamp, method, *args)`. ' \
'You can implement this yourself or you can use the queue_classic-later gem.'
end
queue.enqueue_at(timestamp, "#{JobWrapper.name}.perform", job.serialize)
diff --git a/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb b/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb
index 7d80a6fd7a..21005fc728 100644
--- a/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb
@@ -21,8 +21,7 @@ module ActiveJob
Sidekiq::Client.push \
'class' => JobWrapper,
'queue' => job.queue_name,
- 'args' => [ job.serialize ],
- 'retry' => true
+ 'args' => [ job.serialize ]
end
def enqueue_at(job, timestamp) #:nodoc:
@@ -30,7 +29,6 @@ module ActiveJob
'class' => JobWrapper,
'queue' => job.queue_name,
'args' => [ job.serialize ],
- 'retry' => true,
'at' => timestamp
end
end
diff --git a/activejob/lib/active_job/queue_adapters/test_adapter.rb b/activejob/lib/active_job/queue_adapters/test_adapter.rb
index e4fdf60008..c9e2bdca27 100644
--- a/activejob/lib/active_job/queue_adapters/test_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/test_adapter.rb
@@ -11,9 +11,14 @@ module ActiveJob
# Rails.application.config.active_job.queue_adapter = :test
class TestAdapter
delegate :name, to: :class
- attr_accessor(:perform_enqueued_jobs, :perform_enqueued_at_jobs)
+ attr_accessor(:perform_enqueued_jobs, :perform_enqueued_at_jobs, :filter)
attr_writer(:enqueued_jobs, :performed_jobs)
+ def initialize
+ self.perform_enqueued_jobs = false
+ self.perform_enqueued_at_jobs = false
+ end
+
# Provides a store of all the enqueued jobs with the TestAdapter so you can check them.
def enqueued_jobs
@enqueued_jobs ||= []
@@ -25,22 +30,33 @@ module ActiveJob
end
def enqueue(job) #:nodoc:
- if perform_enqueued_jobs
- performed_jobs << {job: job.class, args: job.arguments, queue: job.queue_name}
- job.perform_now
- else
- enqueued_jobs << {job: job.class, args: job.arguments, queue: job.queue_name}
- end
+ return if filtered?(job)
+
+ job_data = { job: job.class, args: job.serialize['arguments'], queue: job.queue_name }
+ enqueue_or_perform(perform_enqueued_jobs, job, job_data)
end
def enqueue_at(job, timestamp) #:nodoc:
- if perform_enqueued_at_jobs
- performed_jobs << {job: job.class, args: job.arguments, queue: job.queue_name, at: timestamp}
- job.perform_now
- else
- enqueued_jobs << {job: job.class, args: job.arguments, queue: job.queue_name, at: timestamp}
- end
+ return if filtered?(job)
+
+ job_data = { job: job.class, args: job.serialize['arguments'], queue: job.queue_name, at: timestamp }
+ enqueue_or_perform(perform_enqueued_at_jobs, job, job_data)
end
+
+ private
+
+ def enqueue_or_perform(perform, job, job_data)
+ if perform
+ performed_jobs << job_data
+ Base.execute job.serialize
+ else
+ enqueued_jobs << job_data
+ end
+ end
+
+ def filtered?(job)
+ filter && !Array(filter).include?(job.class)
+ end
end
end
end
diff --git a/activejob/lib/active_job/queue_name.rb b/activejob/lib/active_job/queue_name.rb
index 45acb71605..9ae0345120 100644
--- a/activejob/lib/active_job/queue_name.rb
+++ b/activejob/lib/active_job/queue_name.rb
@@ -2,10 +2,20 @@ module ActiveJob
module QueueName
extend ActiveSupport::Concern
+ # Includes the ability to override the default queue name and prefix.
module ClassMethods
mattr_accessor(:queue_name_prefix)
mattr_accessor(:default_queue_name) { "default" }
+ # Specifies the name of the queue to process the job on.
+ #
+ # class PublishToFeedJob < ActiveJob::Base
+ # queue_as :feeds
+ #
+ # def perform(post)
+ # post.to_feed!
+ # end
+ # end
def queue_as(part_name=nil, &block)
if block_given?
self.queue_name = block
@@ -15,15 +25,18 @@ module ActiveJob
end
def queue_name_from_part(part_name) #:nodoc:
- queue_name = part_name.to_s.presence || default_queue_name
+ queue_name = part_name || default_queue_name
name_parts = [queue_name_prefix.presence, queue_name]
- name_parts.compact.join('_')
+ name_parts.compact.join(queue_name_delimiter)
end
end
included do
class_attribute :queue_name, instance_accessor: false
+ class_attribute :queue_name_delimiter, instance_accessor: false
+
self.queue_name = default_queue_name
+ self.queue_name_delimiter = '_' # set default delimiter to '_'
end
# Returns the name of the queue the job will be run on
diff --git a/activejob/lib/active_job/test_helper.rb b/activejob/lib/active_job/test_helper.rb
index af62fae9b9..25bc99a4f8 100644
--- a/activejob/lib/active_job/test_helper.rb
+++ b/activejob/lib/active_job/test_helper.rb
@@ -1,3 +1,5 @@
+require 'active_support/core_ext/hash/keys'
+
module ActiveJob
# Provides helper methods for testing Active Job
module TestHelper
@@ -40,20 +42,28 @@ module ActiveJob
# HelloJob.perform_later('rafael')
# end
# end
- def assert_enqueued_jobs(number)
+ #
+ # The number of times a specific job is enqueued can be asserted.
+ #
+ # def test_logging_job
+ # assert_enqueued_jobs 2, only: LoggingJob do
+ # LoggingJob.perform_later
+ # HelloJob.perform_later('jeremy')
+ # end
+ # end
+ def assert_enqueued_jobs(number, only: nil)
if block_given?
- original_count = enqueued_jobs.size
+ original_count = enqueued_jobs_size(only: only)
yield
- new_count = enqueued_jobs.size
- assert_equal original_count + number, new_count,
- "#{number} jobs expected, but #{new_count - original_count} were enqueued"
+ new_count = enqueued_jobs_size(only: only)
+ assert_equal original_count + number, new_count, "#{number} jobs expected, but #{new_count - original_count} were enqueued"
else
- enqueued_jobs_size = enqueued_jobs.size
- assert_equal number, enqueued_jobs_size, "#{number} jobs expected, but #{enqueued_jobs_size} were enqueued"
+ actual_count = enqueued_jobs_size(only: only)
+ assert_equal number, actual_count, "#{number} jobs expected, but #{actual_count} were enqueued"
end
end
- # Assert that no job have been enqueued.
+ # Asserts that no jobs have been enqueued.
#
# def test_jobs
# assert_no_enqueued_jobs
@@ -69,21 +79,37 @@ module ActiveJob
# end
# end
#
+ # It can be asserted that no jobs of a specific kind are enqueued:
+ #
+ # def test_no_logging
+ # assert_no_enqueued_jobs only: LoggingJob do
+ # HelloJob.perform_later('jeremy')
+ # end
+ # end
+ #
# Note: This assertion is simply a shortcut for:
#
- # assert_enqueued_jobs 0
- def assert_no_enqueued_jobs(&block)
- assert_enqueued_jobs 0, &block
+ # assert_enqueued_jobs 0, &block
+ def assert_no_enqueued_jobs(only: nil, &block)
+ assert_enqueued_jobs 0, only: only, &block
end
# Asserts that the number of performed jobs matches the given number.
+ # If no block is passed, <tt>perform_enqueued_jobs</tt>
+ # must be called around the job call.
#
# def test_jobs
# assert_performed_jobs 0
- # HelloJob.perform_later('xavier')
+ #
+ # perform_enqueued_jobs do
+ # HelloJob.perform_later('xavier')
+ # end
# assert_performed_jobs 1
- # HelloJob.perform_later('yves')
- # assert_performed_jobs 2
+ #
+ # perform_enqueued_jobs do
+ # HelloJob.perform_later('yves')
+ # assert_performed_jobs 2
+ # end
# end
#
# If a block is passed, that block should cause the specified number of
@@ -99,10 +125,32 @@ module ActiveJob
# HelloJob.perform_later('sean')
# end
# end
- def assert_performed_jobs(number)
+ #
+ # The block form supports filtering. If the :only option is specified,
+ # then only the listed job(s) will be performed.
+ #
+ # def test_hello_job
+ # assert_performed_jobs 1, only: HelloJob do
+ # HelloJob.perform_later('jeremy')
+ # LoggingJob.perform_later
+ # end
+ # end
+ #
+ # An array may also be specified, to support testing multiple jobs.
+ #
+ # def test_hello_and_logging_jobs
+ # assert_nothing_raised do
+ # assert_performed_jobs 2, only: [HelloJob, LoggingJob] do
+ # HelloJob.perform_later('jeremy')
+ # LoggingJob.perform_later('stewie')
+ # RescueJob.perform_later('david')
+ # end
+ # end
+ # end
+ def assert_performed_jobs(number, only: nil)
if block_given?
original_count = performed_jobs.size
- yield
+ perform_enqueued_jobs(only: only) { yield }
new_count = performed_jobs.size
assert_equal original_count + number, new_count,
"#{number} jobs expected, but #{new_count - original_count} were performed"
@@ -116,8 +164,11 @@ module ActiveJob
#
# def test_jobs
# assert_no_performed_jobs
- # HelloJob.perform_later('matthew')
- # assert_performed_jobs 1
+ #
+ # perform_enqueued_jobs do
+ # HelloJob.perform_later('matthew')
+ # assert_performed_jobs 1
+ # end
# end
#
# If a block is passed, that block should not cause any job to be performed.
@@ -128,16 +179,38 @@ module ActiveJob
# end
# end
#
+ # The block form supports filtering. If the :only option is specified,
+ # then only the listed job(s) will be performed.
+ #
+ # def test_hello_job
+ # assert_performed_jobs 1, only: HelloJob do
+ # HelloJob.perform_later('jeremy')
+ # LoggingJob.perform_later
+ # end
+ # end
+ #
+ # An array may also be specified, to support testing multiple jobs.
+ #
+ # def test_hello_and_logging_jobs
+ # assert_nothing_raised do
+ # assert_performed_jobs 2, only: [HelloJob, LoggingJob] do
+ # HelloJob.perform_later('jeremy')
+ # LoggingJob.perform_later('stewie')
+ # RescueJob.perform_later('david')
+ # end
+ # end
+ # end
+ #
# Note: This assertion is simply a shortcut for:
#
- # assert_performed_jobs 0
- def assert_no_performed_jobs(&block)
- assert_performed_jobs 0, &block
+ # assert_performed_jobs 0, &block
+ def assert_no_performed_jobs(only: nil, &block)
+ assert_performed_jobs 0, only: only, &block
end
# Asserts that the job passed in the block has been enqueued with the given arguments.
#
- # def assert_enqueued_job
+ # def test_assert_enqueued_with
# assert_enqueued_with(job: MyJob, args: [1,2,3], queue: 'low') do
# MyJob.perform_later(1,2,3)
# end
@@ -146,9 +219,10 @@ module ActiveJob
original_enqueued_jobs = enqueued_jobs.dup
clear_enqueued_jobs
args.assert_valid_keys(:job, :args, :at, :queue)
+ serialized_args = serialize_args_for_assertion(args)
yield
matching_job = enqueued_jobs.any? do |job|
- args.all? { |key, value| value == job[key] }
+ serialized_args.all? { |key, value| value == job[key] }
end
assert matching_job, "No enqueued job found with #{args}"
ensure
@@ -166,15 +240,28 @@ module ActiveJob
original_performed_jobs = performed_jobs.dup
clear_performed_jobs
args.assert_valid_keys(:job, :args, :at, :queue)
- yield
+ serialized_args = serialize_args_for_assertion(args)
+ perform_enqueued_jobs { yield }
matching_job = performed_jobs.any? do |job|
- args.all? { |key, value| value == job[key] }
+ serialized_args.all? { |key, value| value == job[key] }
end
assert matching_job, "No performed job found with #{args}"
ensure
queue_adapter.performed_jobs = original_performed_jobs + performed_jobs
end
+ def perform_enqueued_jobs(only: nil)
+ @old_perform_enqueued_jobs = queue_adapter.perform_enqueued_jobs
+ @old_perform_enqueued_at_jobs = queue_adapter.perform_enqueued_at_jobs
+ queue_adapter.perform_enqueued_jobs = true
+ queue_adapter.perform_enqueued_at_jobs = true
+ queue_adapter.filter = only
+ yield
+ ensure
+ queue_adapter.perform_enqueued_jobs = @old_perform_enqueued_jobs
+ queue_adapter.perform_enqueued_at_jobs = @old_perform_enqueued_at_jobs
+ end
+
def queue_adapter
ActiveJob::Base.queue_adapter
end
@@ -191,6 +278,22 @@ module ActiveJob
def clear_performed_jobs
performed_jobs.clear
end
+
+ def enqueued_jobs_size(only: nil)
+ if only
+ enqueued_jobs.select { |job| job[:job] == only }.size
+ else
+ enqueued_jobs.size
+ end
+ end
+
+ def serialize_args_for_assertion(args)
+ serialized_args = args.dup
+ if job_args = serialized_args.delete(:args)
+ serialized_args[:args] = ActiveJob::Arguments.serialize(job_args)
+ end
+ serialized_args
+ end
end
end
end