aboutsummaryrefslogtreecommitdiffstats
path: root/activejob/lib
diff options
context:
space:
mode:
Diffstat (limited to 'activejob/lib')
-rw-r--r--activejob/lib/active_job/arguments.rb33
-rw-r--r--activejob/lib/active_job/core.rb40
-rw-r--r--activejob/lib/active_job/exceptions.rb29
-rw-r--r--activejob/lib/active_job/execution.rb6
-rw-r--r--activejob/lib/active_job/logging.rb32
-rw-r--r--activejob/lib/active_job/queue_adapter.rb2
-rw-r--r--activejob/lib/active_job/queue_adapters.rb2
-rw-r--r--activejob/lib/active_job/queue_adapters/async_adapter.rb2
-rw-r--r--activejob/lib/active_job/queue_adapters/backburner_adapter.rb4
-rw-r--r--activejob/lib/active_job/queue_adapters/test_adapter.rb20
-rw-r--r--activejob/lib/active_job/railtie.rb4
-rw-r--r--activejob/lib/active_job/test_helper.rb263
12 files changed, 326 insertions, 111 deletions
diff --git a/activejob/lib/active_job/arguments.rb b/activejob/lib/active_job/arguments.rb
index 86bb0c5540..a86a82c6e3 100644
--- a/activejob/lib/active_job/arguments.rb
+++ b/activejob/lib/active_job/arguments.rb
@@ -24,18 +24,20 @@ module ActiveJob
module Arguments
extend self
# :nodoc:
- TYPE_WHITELIST = [ NilClass, String, Integer, Float, BigDecimal, TrueClass, FalseClass ]
+ PERMITTED_TYPES = [ NilClass, String, Integer, Float, BigDecimal, TrueClass, FalseClass ]
- # Serializes a set of arguments. Whitelisted types are returned
- # as-is. Arrays/Hashes are serialized element by element.
- # All other types are serialized using GlobalID.
+ # Serializes a set of arguments. Intrinsic types that can safely be
+ # serialized without mutation are returned as-is. Arrays/Hashes are
+ # serialized element by element. All other types are serialized using
+ # GlobalID.
def serialize(arguments)
arguments.map { |argument| serialize_argument(argument) }
end
- # Deserializes a set of arguments. Whitelisted types are returned
- # as-is. Arrays/Hashes are deserialized element by element.
- # All other types are deserialized using GlobalID.
+ # Deserializes a set of arguments. Intrinsic types that can safely be
+ # deserialized without mutation are returned as-is. Arrays/Hashes are
+ # deserialized element by element. All other types are deserialized using
+ # GlobalID.
def deserialize(arguments)
arguments.map { |argument| deserialize_argument(argument) }
rescue
@@ -45,11 +47,11 @@ module ActiveJob
private
# :nodoc:
- GLOBALID_KEY = "_aj_globalid".freeze
+ GLOBALID_KEY = "_aj_globalid"
# :nodoc:
- SYMBOL_KEYS_KEY = "_aj_symbol_keys".freeze
+ SYMBOL_KEYS_KEY = "_aj_symbol_keys"
# :nodoc:
- WITH_INDIFFERENT_ACCESS_KEY = "_aj_hash_with_indifferent_access".freeze
+ WITH_INDIFFERENT_ACCESS_KEY = "_aj_hash_with_indifferent_access"
# :nodoc:
OBJECT_SERIALIZER_KEY = "_aj_serialized"
@@ -60,11 +62,11 @@ module ActiveJob
OBJECT_SERIALIZER_KEY, OBJECT_SERIALIZER_KEY.to_sym,
WITH_INDIFFERENT_ACCESS_KEY, WITH_INDIFFERENT_ACCESS_KEY.to_sym,
]
- private_constant :RESERVED_KEYS
+ private_constant :RESERVED_KEYS, :GLOBALID_KEY, :SYMBOL_KEYS_KEY, :WITH_INDIFFERENT_ACCESS_KEY
def serialize_argument(argument)
case argument
- when *TYPE_WHITELIST
+ when *PERMITTED_TYPES
argument
when GlobalID::Identification
convert_to_global_id_hash(argument)
@@ -88,7 +90,7 @@ module ActiveJob
case argument
when String
GlobalID::Locator.locate(argument) || argument
- when *TYPE_WHITELIST
+ when *PERMITTED_TYPES
argument
when Array
argument.map { |arg| deserialize_argument(arg) }
@@ -145,7 +147,10 @@ module ActiveJob
end
def transform_symbol_keys(hash, symbol_keys)
- hash.transform_keys do |key|
+ # NOTE: HashWithIndifferentAccess#transform_keys always
+ # returns stringified keys with indifferent access
+ # so we call #to_h here to ensure keys are symbolized.
+ hash.to_h.transform_keys do |key|
if symbol_keys.include?(key)
key.to_sym
else
diff --git a/activejob/lib/active_job/core.rb b/activejob/lib/active_job/core.rb
index 61d402cfca..62bb5861bb 100644
--- a/activejob/lib/active_job/core.rb
+++ b/activejob/lib/active_job/core.rb
@@ -6,35 +6,33 @@ module ActiveJob
module Core
extend ActiveSupport::Concern
- included do
- # Job arguments
- attr_accessor :arguments
- attr_writer :serialized_arguments
+ # Job arguments
+ attr_accessor :arguments
+ attr_writer :serialized_arguments
- # Timestamp when the job should be performed
- attr_accessor :scheduled_at
+ # Timestamp when the job should be performed
+ attr_accessor :scheduled_at
- # Job Identifier
- attr_accessor :job_id
+ # Job Identifier
+ attr_accessor :job_id
- # Queue in which the job will reside.
- attr_writer :queue_name
+ # Queue in which the job will reside.
+ attr_writer :queue_name
- # Priority that the job will have (lower is more priority).
- attr_writer :priority
+ # Priority that the job will have (lower is more priority).
+ attr_writer :priority
- # ID optionally provided by adapter
- attr_accessor :provider_job_id
+ # ID optionally provided by adapter
+ attr_accessor :provider_job_id
- # Number of times this job has been executed (which increments on every retry, like after an exception).
- attr_accessor :executions
+ # Number of times this job has been executed (which increments on every retry, like after an exception).
+ attr_accessor :executions
- # I18n.locale to be used during the job.
- attr_accessor :locale
+ # I18n.locale to be used during the job.
+ attr_accessor :locale
- # Timezone to be used during the job.
- attr_accessor :timezone
- end
+ # Timezone to be used during the job.
+ attr_accessor :timezone
# These methods will be included into any Active Job object, adding
# helpers for de/serialization and creation of job instances.
diff --git a/activejob/lib/active_job/exceptions.rb b/activejob/lib/active_job/exceptions.rb
index 1e57dbcb1c..bc9e168971 100644
--- a/activejob/lib/active_job/exceptions.rb
+++ b/activejob/lib/active_job/exceptions.rb
@@ -9,6 +9,7 @@ module ActiveJob
module ClassMethods
# Catch the exception and reschedule job for re-execution after so many seconds, for a specific number of attempts.
+ # The number of attempts includes the total executions of a job, not just the retried executions.
# If the exception keeps getting raised beyond the specified number of attempts, the exception is allowed to
# bubble up to the underlying queuing system, which may have its own retry mechanism or place it in a
# holding queue for inspection.
@@ -21,7 +22,8 @@ module ActiveJob
# as a computing proc that the number of executions so far as an argument, or as a symbol reference of
# <tt>:exponentially_longer</tt>, which applies the wait algorithm of <tt>(executions ** 4) + 2</tt>
# (first wait 3s, then 18s, then 83s, etc)
- # * <tt>:attempts</tt> - Re-enqueues the job the specified number of times (default: 5 attempts)
+ # * <tt>:attempts</tt> - Re-enqueues the job the specified number of times (default: 5 attempts),
+ # attempts here refers to the total number of times the job is executed, not just retried executions
# * <tt>:queue</tt> - Re-enqueues the job on a different queue
# * <tt>:priority</tt> - Re-enqueues the job with a different priority
#
@@ -45,13 +47,14 @@ module ActiveJob
def retry_on(*exceptions, wait: 3.seconds, attempts: 5, queue: nil, priority: nil)
rescue_from(*exceptions) do |error|
if executions < attempts
- logger.error "Retrying #{self.class} in #{wait} seconds, due to a #{error.class}. The original exception was #{error.cause.inspect}."
- retry_job wait: determine_delay(wait), queue: queue, priority: priority
+ retry_job wait: determine_delay(wait), queue: queue, priority: priority, error: error
else
if block_given?
- yield self, error
+ instrument :retry_stopped, error: error do
+ yield self, error
+ end
else
- logger.error "Stopped retrying #{self.class} due to a #{error.class}, which reoccurred on #{executions} attempts. The original exception was #{error.cause.inspect}."
+ instrument :retry_stopped, error: error
raise error
end
end
@@ -78,10 +81,8 @@ module ActiveJob
# end
def discard_on(*exceptions)
rescue_from(*exceptions) do |error|
- if block_given?
- yield self, error
- else
- logger.error "Discarded #{self.class} due to a #{error.class}. The original exception was #{error.cause.inspect}."
+ instrument :discard, error: error do
+ yield self, error if block_given?
end
end
end
@@ -109,7 +110,9 @@ module ActiveJob
# end
# end
def retry_job(options = {})
- enqueue options
+ instrument :enqueue_retry, options.slice(:error, :wait) do
+ enqueue options
+ end
end
private
@@ -130,5 +133,11 @@ module ActiveJob
raise "Couldn't determine a delay based on #{seconds_or_duration_or_algorithm.inspect}"
end
end
+
+ def instrument(name, error: nil, wait: nil, &block)
+ payload = { job: self, adapter: self.class.queue_adapter, error: error, wait: wait }
+
+ ActiveSupport::Notifications.instrument("#{name}.active_job", payload, &block)
+ end
end
end
diff --git a/activejob/lib/active_job/execution.rb b/activejob/lib/active_job/execution.rb
index d75be376ec..f5a343311f 100644
--- a/activejob/lib/active_job/execution.rb
+++ b/activejob/lib/active_job/execution.rb
@@ -31,11 +31,11 @@ module ActiveJob
#
# MyJob.new(*args).perform_now
def perform_now
+ # Guard against jobs that were persisted before we started counting executions by zeroing out nil counters
+ self.executions = (executions || 0) + 1
+
deserialize_arguments_if_needed
run_callbacks :perform do
- # Guard against jobs that were persisted before we started counting executions by zeroing out nil counters
- self.executions = (executions || 0) + 1
-
perform(*arguments)
end
rescue => exception
diff --git a/activejob/lib/active_job/logging.rb b/activejob/lib/active_job/logging.rb
index 9ffd60ad53..416be83c24 100644
--- a/activejob/lib/active_job/logging.rb
+++ b/activejob/lib/active_job/logging.rb
@@ -88,6 +88,38 @@ module ActiveJob
end
end
+ def enqueue_retry(event)
+ job = event.payload[:job]
+ ex = event.payload[:error]
+ wait = event.payload[:wait]
+
+ info do
+ if ex
+ "Retrying #{job.class} in #{wait.to_i} seconds, due to a #{ex.class}."
+ else
+ "Retrying #{job.class} in #{wait.to_i} seconds."
+ end
+ end
+ end
+
+ def retry_stopped(event)
+ job = event.payload[:job]
+ ex = event.payload[:error]
+
+ error do
+ "Stopped retrying #{job.class} due to a #{ex.class}, which reoccurred on #{job.executions} attempts."
+ end
+ end
+
+ def discard(event)
+ job = event.payload[:job]
+ ex = event.payload[:error]
+
+ error do
+ "Discarded #{job.class} due to a #{ex.class}."
+ end
+ end
+
private
def queue_name(event)
event.payload[:adapter].class.name.demodulize.remove("Adapter") + "(#{event.payload[:job].queue_name})"
diff --git a/activejob/lib/active_job/queue_adapter.rb b/activejob/lib/active_job/queue_adapter.rb
index 006a683b85..954bfd1dd1 100644
--- a/activejob/lib/active_job/queue_adapter.rb
+++ b/activejob/lib/active_job/queue_adapter.rb
@@ -22,6 +22,8 @@ module ActiveJob
_queue_adapter
end
+ # Returns string denoting the name of the configured queue adapter.
+ # By default returns +"async"+.
def queue_adapter_name
_queue_adapter_name
end
diff --git a/activejob/lib/active_job/queue_adapters.rb b/activejob/lib/active_job/queue_adapters.rb
index 00c7b407b1..3e3a474fbb 100644
--- a/activejob/lib/active_job/queue_adapters.rb
+++ b/activejob/lib/active_job/queue_adapters.rb
@@ -121,7 +121,7 @@ module ActiveJob
autoload :SuckerPunchAdapter
autoload :TestAdapter
- ADAPTER = "Adapter".freeze
+ ADAPTER = "Adapter"
private_constant :ADAPTER
class << self
diff --git a/activejob/lib/active_job/queue_adapters/async_adapter.rb b/activejob/lib/active_job/queue_adapters/async_adapter.rb
index ebf6f384e3..53a7e3d53e 100644
--- a/activejob/lib/active_job/queue_adapters/async_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/async_adapter.rb
@@ -31,7 +31,7 @@ module ActiveJob
# jobs. Since jobs share a single thread pool, long-running jobs will block
# short-lived jobs. Fine for dev/test; bad for production.
class AsyncAdapter
- # See {Concurrent::ThreadPoolExecutor}[https://ruby-concurrency.github.io/concurrent-ruby/Concurrent/ThreadPoolExecutor.html] for executor options.
+ # See {Concurrent::ThreadPoolExecutor}[https://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/ThreadPoolExecutor.html] for executor options.
def initialize(**executor_options)
@scheduler = Scheduler.new(**executor_options)
end
diff --git a/activejob/lib/active_job/queue_adapters/backburner_adapter.rb b/activejob/lib/active_job/queue_adapters/backburner_adapter.rb
index 0ba93c6e0b..7dc49310ac 100644
--- a/activejob/lib/active_job/queue_adapters/backburner_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/backburner_adapter.rb
@@ -16,12 +16,12 @@ module ActiveJob
# Rails.application.config.active_job.queue_adapter = :backburner
class BackburnerAdapter
def enqueue(job) #:nodoc:
- Backburner::Worker.enqueue JobWrapper, [ job.serialize ], queue: job.queue_name
+ Backburner::Worker.enqueue(JobWrapper, [job.serialize], queue: job.queue_name, pri: job.priority)
end
def enqueue_at(job, timestamp) #:nodoc:
delay = timestamp - Time.current.to_f
- Backburner::Worker.enqueue JobWrapper, [ job.serialize ], queue: job.queue_name, delay: delay
+ Backburner::Worker.enqueue(JobWrapper, [job.serialize], queue: job.queue_name, pri: job.priority, delay: delay)
end
class JobWrapper #:nodoc:
diff --git a/activejob/lib/active_job/queue_adapters/test_adapter.rb b/activejob/lib/active_job/queue_adapters/test_adapter.rb
index 885f9ff01c..f73ad444ba 100644
--- a/activejob/lib/active_job/queue_adapters/test_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/test_adapter.rb
@@ -12,7 +12,7 @@ module ActiveJob
#
# Rails.application.config.active_job.queue_adapter = :test
class TestAdapter
- attr_accessor(:perform_enqueued_jobs, :perform_enqueued_at_jobs, :filter, :reject)
+ attr_accessor(:perform_enqueued_jobs, :perform_enqueued_at_jobs, :filter, :reject, :queue)
attr_writer(:enqueued_jobs, :performed_jobs)
# Provides a store of all the enqueued jobs with the TestAdapter so you can check them.
@@ -29,14 +29,14 @@ module ActiveJob
return if filtered?(job)
job_data = job_to_hash(job)
- enqueue_or_perform(perform_enqueued_jobs, job, job_data)
+ perform_or_enqueue(perform_enqueued_jobs, job, job_data)
end
def enqueue_at(job, timestamp) #:nodoc:
return if filtered?(job)
job_data = job_to_hash(job, at: timestamp)
- enqueue_or_perform(perform_enqueued_at_jobs, job, job_data)
+ perform_or_enqueue(perform_enqueued_at_jobs, job, job_data)
end
private
@@ -44,7 +44,7 @@ module ActiveJob
{ job: job.class, args: job.serialize.fetch("arguments"), queue: job.queue_name }.merge!(extras)
end
- def enqueue_or_perform(perform, job, job_data)
+ def perform_or_enqueue(perform, job, job_data)
if perform
performed_jobs << job_data
Base.execute job.serialize
@@ -54,12 +54,20 @@ module ActiveJob
end
def filtered?(job)
+ filtered_queue?(job) || filtered_job_class?(job)
+ end
+
+ def filtered_queue?(job)
+ if queue
+ job.queue_name != queue.to_s
+ end
+ end
+
+ def filtered_job_class?(job)
if filter
!Array(filter).include?(job.class)
elsif reject
Array(reject).include?(job.class)
- else
- false
end
end
end
diff --git a/activejob/lib/active_job/railtie.rb b/activejob/lib/active_job/railtie.rb
index d0294854d3..ecc0908d5f 100644
--- a/activejob/lib/active_job/railtie.rb
+++ b/activejob/lib/active_job/railtie.rb
@@ -30,6 +30,10 @@ module ActiveJob
send(k, v) if respond_to? k
end
end
+
+ ActiveSupport.on_load(:action_dispatch_integration_test) do
+ include ActiveJob::TestHelper
+ end
end
initializer "active_job.set_reloader_hook" do |app|
diff --git a/activejob/lib/active_job/test_helper.rb b/activejob/lib/active_job/test_helper.rb
index 04cde28a96..0deb68d0d2 100644
--- a/activejob/lib/active_job/test_helper.rb
+++ b/activejob/lib/active_job/test_helper.rb
@@ -52,7 +52,7 @@ module ActiveJob
queue_adapter_changed_jobs.each { |klass| klass.disable_test_adapter }
end
- # Specifies the queue adapter to use with all active job test helpers.
+ # Specifies the queue adapter to use with all Active Job test helpers.
#
# Returns an instance of the queue adapter and defaults to
# <tt>ActiveJob::QueueAdapters::TestAdapter</tt>.
@@ -75,7 +75,7 @@ module ActiveJob
# assert_enqueued_jobs 2
# end
#
- # If a block is passed, that block will cause the specified number of
+ # If a block is passed, asserts that the block will cause the specified number of
# jobs to be enqueued.
#
# def test_jobs_again
@@ -89,7 +89,7 @@ module ActiveJob
# end
# end
#
- # The number of times a specific job was enqueued can be asserted.
+ # Asserts the number of times a specific job was enqueued by passing +:only+ option.
#
# def test_logging_job
# assert_enqueued_jobs 1, only: LoggingJob do
@@ -98,7 +98,7 @@ module ActiveJob
# end
# end
#
- # The number of times a job except specific class was enqueued can be asserted.
+ # Asserts the number of times a job except specific class was enqueued by passing +:except+ option.
#
# def test_logging_job
# assert_enqueued_jobs 1, except: HelloJob do
@@ -107,7 +107,7 @@ module ActiveJob
# end
# end
#
- # The number of times a job is enqueued to a specific queue can also be asserted.
+ # Asserts the number of times a job is enqueued to a specific queue by passing +:queue+ option.
#
# def test_logging_job
# assert_enqueued_jobs 2, queue: 'default' do
@@ -117,14 +117,18 @@ module ActiveJob
# end
def assert_enqueued_jobs(number, only: nil, except: nil, queue: nil)
if block_given?
- original_count = enqueued_jobs_size(only: only, except: except, queue: queue)
+ original_count = enqueued_jobs_with(only: only, except: except, queue: queue)
+
yield
- new_count = enqueued_jobs_size(only: only, except: except, queue: queue)
- assert_equal number, new_count - original_count, "#{number} jobs expected, but #{new_count - original_count} were enqueued"
+
+ new_count = enqueued_jobs_with(only: only, except: except, queue: queue)
+
+ actual_count = new_count - original_count
else
- actual_count = enqueued_jobs_size(only: only, except: except, queue: queue)
- assert_equal number, actual_count, "#{number} jobs expected, but #{actual_count} were enqueued"
+ actual_count = enqueued_jobs_with(only: only, except: except, queue: queue)
end
+
+ assert_equal number, actual_count, "#{number} jobs expected, but #{actual_count} were enqueued"
end
# Asserts that no jobs have been enqueued.
@@ -135,7 +139,7 @@ module ActiveJob
# assert_enqueued_jobs 1
# end
#
- # If a block is passed, that block should not cause any job to be enqueued.
+ # If a block is passed, asserts that the block will not cause any job to be enqueued.
#
# def test_jobs_again
# assert_no_enqueued_jobs do
@@ -143,7 +147,7 @@ module ActiveJob
# end
# end
#
- # It can be asserted that no jobs of a specific kind are enqueued:
+ # Asserts that no jobs of a specific kind are enqueued by passing +:only+ option.
#
# def test_no_logging
# assert_no_enqueued_jobs only: LoggingJob do
@@ -151,7 +155,7 @@ module ActiveJob
# end
# end
#
- # It can be asserted that no jobs except specific class are enqueued:
+ # Asserts that no jobs except specific class are enqueued by passing +:except+ option.
#
# def test_no_logging
# assert_no_enqueued_jobs except: HelloJob do
@@ -159,7 +163,7 @@ module ActiveJob
# end
# end
#
- # It can be asserted that no jobs are enqueued to a specific queue:
+ # Asserts that no jobs are enqueued to a specific queue by passing +:queue+ option
#
# def test_no_logging
# assert_no_enqueued_jobs queue: 'default' do
@@ -176,7 +180,7 @@ module ActiveJob
# Asserts that the number of performed jobs matches the given number.
# If no block is passed, <tt>perform_enqueued_jobs</tt>
- # must be called around the job call.
+ # must be called around or after the job call.
#
# def test_jobs
# assert_performed_jobs 0
@@ -186,13 +190,14 @@ module ActiveJob
# end
# assert_performed_jobs 1
#
- # perform_enqueued_jobs do
- # HelloJob.perform_later('yves')
- # assert_performed_jobs 2
- # end
+ # HelloJob.perform_later('yves')
+ #
+ # perform_enqueued_jobs
+ #
+ # assert_performed_jobs 2
# end
#
- # If a block is passed, that block should cause the specified number of
+ # If a block is passed, asserts that the block will cause the specified number of
# jobs to be performed.
#
# def test_jobs_again
@@ -206,7 +211,7 @@ module ActiveJob
# end
# end
#
- # The block form supports filtering. If the :only option is specified,
+ # This method also supports filtering. If the +:only+ option is specified,
# then only the listed job(s) will be performed.
#
# def test_hello_job
@@ -216,7 +221,7 @@ module ActiveJob
# end
# end
#
- # Also if the :except option is specified,
+ # Also if the +:except+ option is specified,
# then the job(s) except specific class will be performed.
#
# def test_hello_job
@@ -237,17 +242,30 @@ module ActiveJob
# end
# end
# end
- def assert_performed_jobs(number, only: nil, except: nil)
+ #
+ # If the +:queue+ option is specified,
+ # then only the job(s) enqueued to a specific queue will be performed.
+ #
+ # def test_assert_performed_jobs_with_queue_option
+ # assert_performed_jobs 1, queue: :some_queue do
+ # HelloJob.set(queue: :some_queue).perform_later("jeremy")
+ # HelloJob.set(queue: :other_queue).perform_later("bogdan")
+ # end
+ # end
+ def assert_performed_jobs(number, only: nil, except: nil, queue: nil, &block)
if block_given?
original_count = performed_jobs.size
- perform_enqueued_jobs(only: only, except: except) { yield }
+
+ perform_enqueued_jobs(only: only, except: except, queue: queue, &block)
+
new_count = performed_jobs.size
- assert_equal number, new_count - original_count,
- "#{number} jobs expected, but #{new_count - original_count} were performed"
+
+ performed_jobs_size = new_count - original_count
else
- performed_jobs_size = performed_jobs.size
- assert_equal number, performed_jobs_size, "#{number} jobs expected, but #{performed_jobs_size} were performed"
+ performed_jobs_size = performed_jobs_with(only: only, except: except, queue: queue)
end
+
+ assert_equal number, performed_jobs_size, "#{number} jobs expected, but #{performed_jobs_size} were performed"
end
# Asserts that no jobs have been performed.
@@ -261,7 +279,7 @@ module ActiveJob
# end
# end
#
- # If a block is passed, that block should not cause any job to be performed.
+ # If a block is passed, asserts that the block will not cause any job to be performed.
#
# def test_jobs_again
# assert_no_performed_jobs do
@@ -269,7 +287,7 @@ module ActiveJob
# end
# end
#
- # The block form supports filtering. If the :only option is specified,
+ # The block form supports filtering. If the +:only+ option is specified,
# then only the listed job(s) will not be performed.
#
# def test_no_logging
@@ -278,7 +296,7 @@ module ActiveJob
# end
# end
#
- # Also if the :except option is specified,
+ # Also if the +:except+ option is specified,
# then the job(s) except specific class will not be performed.
#
# def test_no_logging
@@ -287,11 +305,20 @@ module ActiveJob
# end
# end
#
+ # If the +:queue+ option is specified,
+ # then only the job(s) enqueued to a specific queue will not be performed.
+ #
+ # def test_assert_no_performed_jobs_with_queue_option
+ # assert_no_performed_jobs queue: :some_queue do
+ # HelloJob.set(queue: :other_queue).perform_later("jeremy")
+ # end
+ # end
+ #
# Note: This assertion is simply a shortcut for:
#
# assert_performed_jobs 0, &block
- def assert_no_performed_jobs(only: nil, except: nil, &block)
- assert_performed_jobs 0, only: only, except: except, &block
+ def assert_no_performed_jobs(only: nil, except: nil, queue: nil, &block)
+ assert_performed_jobs 0, only: only, except: except, queue: queue, &block
end
# Asserts that the job has been enqueued with the given arguments.
@@ -304,7 +331,23 @@ module ActiveJob
# assert_enqueued_with(job: MyJob, at: Date.tomorrow.noon)
# end
#
- # If a block is passed, that block should cause the job to be
+ #
+ # The +args+ argument also accepts a proc which will get passed the actual
+ # job's arguments. Your proc needs to returns a boolean value determining if
+ # the job's arguments matches your expectation. This is useful to check only
+ # for a subset of arguments.
+ #
+ # def test_assert_enqueued_with
+ # expected_args = ->(job_args) do
+ # assert job_args.first.key?(:foo)
+ # end
+ #
+ # MyJob.perform_later(foo: 'bar', other_arg: 'No need to check in the test')
+ # assert_enqueued_with(job: MyJob, args: expected_args, queue: 'low')
+ # end
+ #
+ #
+ # If a block is passed, asserts that the block will cause the job to be
# enqueued with the given arguments.
#
# def test_assert_enqueued_with
@@ -318,7 +361,7 @@ module ActiveJob
# end
def assert_enqueued_with(job: nil, args: nil, at: nil, queue: nil)
expected = { job: job, args: args, at: at, queue: queue }.compact
- serialized_args = serialize_args_for_assertion(expected)
+ expected_args = prepare_args_for_assertion(expected)
if block_given?
original_enqueued_jobs_count = enqueued_jobs.count
@@ -331,14 +374,56 @@ module ActiveJob
end
matching_job = jobs.find do |enqueued_job|
- serialized_args.all? { |key, value| value == enqueued_job[key] }
+ deserialized_job = deserialize_args_for_assertion(enqueued_job)
+
+ expected_args.all? do |key, value|
+ if value.respond_to?(:call)
+ value.call(deserialized_job[key])
+ else
+ value == deserialized_job[key]
+ end
+ end
end
assert matching_job, "No enqueued job found with #{expected}"
instantiate_job(matching_job)
end
- # Asserts that the job passed in the block has been performed with the given arguments.
+ # Asserts that the job has been performed with the given arguments.
+ #
+ # def test_assert_performed_with
+ # MyJob.perform_later(1,2,3)
+ #
+ # perform_enqueued_jobs
+ #
+ # assert_performed_with(job: MyJob, args: [1,2,3], queue: 'high')
+ #
+ # MyJob.set(wait_until: Date.tomorrow.noon).perform_later
+ #
+ # perform_enqueued_jobs
+ #
+ # assert_performed_with(job: MyJob, at: Date.tomorrow.noon)
+ # end
+ #
+ # The +args+ argument also accepts a proc which will get passed the actual
+ # job's arguments. Your proc needs to returns a boolean value determining if
+ # the job's arguments matches your expectation. This is useful to check only
+ # for a subset of arguments.
+ #
+ # def test_assert_performed_with
+ # expected_args = ->(job_args) do
+ # assert job_args.first.key?(:foo)
+ # end
+ # MyJob.perform_later(foo: 'bar', other_arg: 'No need to check in the test')
+ #
+ # perform_enqueued_jobs
+ #
+ # assert_performed_with(job: MyJob, args: expected_args, queue: 'high')
+ # end
+ #
+ # If a block is passed, that block performs all of the jobs that were
+ # enqueued throughout the duration of the block and asserts that
+ # the job has been performed with the given arguments in the block.
#
# def test_assert_performed_with
# assert_performed_with(job: MyJob, args: [1,2,3], queue: 'high') do
@@ -349,20 +434,39 @@ module ActiveJob
# MyJob.set(wait_until: Date.tomorrow.noon).perform_later
# end
# end
- def assert_performed_with(job: nil, args: nil, at: nil, queue: nil)
- original_performed_jobs_count = performed_jobs.count
+ def assert_performed_with(job: nil, args: nil, at: nil, queue: nil, &block)
expected = { job: job, args: args, at: at, queue: queue }.compact
- serialized_args = serialize_args_for_assertion(expected)
- perform_enqueued_jobs { yield }
- in_block_jobs = performed_jobs.drop(original_performed_jobs_count)
- matching_job = in_block_jobs.find do |in_block_job|
- serialized_args.all? { |key, value| value == in_block_job[key] }
+ expected_args = prepare_args_for_assertion(expected)
+
+ if block_given?
+ original_performed_jobs_count = performed_jobs.count
+
+ perform_enqueued_jobs(&block)
+
+ jobs = performed_jobs.drop(original_performed_jobs_count)
+ else
+ jobs = performed_jobs
end
+
+ matching_job = jobs.find do |enqueued_job|
+ deserialized_job = deserialize_args_for_assertion(enqueued_job)
+
+ expected_args.all? do |key, value|
+ if value.respond_to?(:call)
+ value.call(deserialized_job[key])
+ else
+ value == deserialized_job[key]
+ end
+ end
+ end
+
assert matching_job, "No performed job found with #{expected}"
instantiate_job(matching_job)
end
- # Performs all enqueued jobs in the duration of the block.
+ # Performs all enqueued jobs. If a block is given, performs all of the jobs
+ # that were enqueued throughout the duration of the block. If a block is
+ # not given, performs all of the enqueued jobs up to this point in the test.
#
# def test_perform_enqueued_jobs
# perform_enqueued_jobs do
@@ -371,6 +475,14 @@ module ActiveJob
# assert_performed_jobs 1
# end
#
+ # def test_perform_enqueued_jobs_without_block
+ # MyJob.perform_later(1, 2, 3)
+ #
+ # perform_enqueued_jobs
+ #
+ # assert_performed_jobs 1
+ # end
+ #
# This method also supports filtering. If the +:only+ option is specified,
# then only the listed job(s) will be performed.
#
@@ -393,24 +505,42 @@ module ActiveJob
# assert_performed_jobs 1
# end
#
- def perform_enqueued_jobs(only: nil, except: nil)
+ # If the +:queue+ option is specified,
+ # then only the job(s) enqueued to a specific queue will be performed.
+ #
+ # def test_perform_enqueued_jobs_with_queue
+ # perform_enqueued_jobs queue: :some_queue do
+ # MyJob.set(queue: :some_queue).perform_later(1, 2, 3) # will be performed
+ # HelloJob.set(queue: :other_queue).perform_later(1, 2, 3) # will not be performed
+ # end
+ # assert_performed_jobs 1
+ # end
+ #
+ def perform_enqueued_jobs(only: nil, except: nil, queue: nil)
+ return flush_enqueued_jobs(only: only, except: except, queue: queue) unless block_given?
+
validate_option(only: only, except: except)
+
old_perform_enqueued_jobs = queue_adapter.perform_enqueued_jobs
old_perform_enqueued_at_jobs = queue_adapter.perform_enqueued_at_jobs
old_filter = queue_adapter.filter
old_reject = queue_adapter.reject
+ old_queue = queue_adapter.queue
begin
queue_adapter.perform_enqueued_jobs = true
queue_adapter.perform_enqueued_at_jobs = true
queue_adapter.filter = only
queue_adapter.reject = except
+ queue_adapter.queue = queue
+
yield
ensure
queue_adapter.perform_enqueued_jobs = old_perform_enqueued_jobs
queue_adapter.perform_enqueued_at_jobs = old_perform_enqueued_at_jobs
queue_adapter.filter = old_filter
queue_adapter.reject = old_reject
+ queue_adapter.queue = old_queue
end
end
@@ -432,31 +562,58 @@ module ActiveJob
performed_jobs.clear
end
- def enqueued_jobs_size(only: nil, except: nil, queue: nil)
+ def jobs_with(jobs, only: nil, except: nil, queue: nil)
validate_option(only: only, except: except)
- enqueued_jobs.count do |job|
+
+ jobs.count do |job|
job_class = job.fetch(:job)
+
if only
next false unless Array(only).include?(job_class)
elsif except
next false if Array(except).include?(job_class)
end
+
if queue
next false unless queue.to_s == job.fetch(:queue, job_class.queue_name)
end
+
+ yield job if block_given?
+
true
end
end
- def serialize_args_for_assertion(args)
- args.dup.tap do |serialized_args|
- serialized_args[:args] = ActiveJob::Arguments.serialize(serialized_args[:args]) if serialized_args[:args]
- serialized_args[:at] = serialized_args[:at].to_f if serialized_args[:at]
+ def enqueued_jobs_with(only: nil, except: nil, queue: nil, &block)
+ jobs_with(enqueued_jobs, only: only, except: except, queue: queue, &block)
+ end
+
+ def performed_jobs_with(only: nil, except: nil, queue: nil, &block)
+ jobs_with(performed_jobs, only: only, except: except, queue: queue, &block)
+ end
+
+ def flush_enqueued_jobs(only: nil, except: nil, queue: nil)
+ enqueued_jobs_with(only: only, except: except, queue: queue) do |payload|
+ instantiate_job(payload).perform_now
+ queue_adapter.performed_jobs << payload
+ end
+ end
+
+ def prepare_args_for_assertion(args)
+ args.dup.tap do |arguments|
+ arguments[:at] = arguments[:at].to_f if arguments[:at]
+ end
+ end
+
+ def deserialize_args_for_assertion(job)
+ job.dup.tap do |new_job|
+ new_job[:args] = ActiveJob::Arguments.deserialize(new_job[:args]) if new_job[:args]
end
end
def instantiate_job(payload)
- job = payload[:job].new(*payload[:args])
+ args = ActiveJob::Arguments.deserialize(payload[:args])
+ job = payload[:job].new(*args)
job.scheduled_at = Time.at(payload[:at]) if payload.key?(:at)
job.queue_name = payload[:queue]
job