diff options
Diffstat (limited to 'activejob/lib')
-rw-r--r-- | activejob/lib/active_job/arguments.rb | 33 | ||||
-rw-r--r-- | activejob/lib/active_job/core.rb | 40 | ||||
-rw-r--r-- | activejob/lib/active_job/exceptions.rb | 29 | ||||
-rw-r--r-- | activejob/lib/active_job/execution.rb | 6 | ||||
-rw-r--r-- | activejob/lib/active_job/logging.rb | 32 | ||||
-rw-r--r-- | activejob/lib/active_job/queue_adapter.rb | 2 | ||||
-rw-r--r-- | activejob/lib/active_job/queue_adapters.rb | 2 | ||||
-rw-r--r-- | activejob/lib/active_job/queue_adapters/async_adapter.rb | 2 | ||||
-rw-r--r-- | activejob/lib/active_job/queue_adapters/backburner_adapter.rb | 4 | ||||
-rw-r--r-- | activejob/lib/active_job/queue_adapters/test_adapter.rb | 20 | ||||
-rw-r--r-- | activejob/lib/active_job/railtie.rb | 4 | ||||
-rw-r--r-- | activejob/lib/active_job/test_helper.rb | 263 |
12 files changed, 326 insertions, 111 deletions
diff --git a/activejob/lib/active_job/arguments.rb b/activejob/lib/active_job/arguments.rb index 86bb0c5540..a86a82c6e3 100644 --- a/activejob/lib/active_job/arguments.rb +++ b/activejob/lib/active_job/arguments.rb @@ -24,18 +24,20 @@ module ActiveJob module Arguments extend self # :nodoc: - TYPE_WHITELIST = [ NilClass, String, Integer, Float, BigDecimal, TrueClass, FalseClass ] + PERMITTED_TYPES = [ NilClass, String, Integer, Float, BigDecimal, TrueClass, FalseClass ] - # Serializes a set of arguments. Whitelisted types are returned - # as-is. Arrays/Hashes are serialized element by element. - # All other types are serialized using GlobalID. + # Serializes a set of arguments. Intrinsic types that can safely be + # serialized without mutation are returned as-is. Arrays/Hashes are + # serialized element by element. All other types are serialized using + # GlobalID. def serialize(arguments) arguments.map { |argument| serialize_argument(argument) } end - # Deserializes a set of arguments. Whitelisted types are returned - # as-is. Arrays/Hashes are deserialized element by element. - # All other types are deserialized using GlobalID. + # Deserializes a set of arguments. Intrinsic types that can safely be + # deserialized without mutation are returned as-is. Arrays/Hashes are + # deserialized element by element. All other types are deserialized using + # GlobalID. def deserialize(arguments) arguments.map { |argument| deserialize_argument(argument) } rescue @@ -45,11 +47,11 @@ module ActiveJob private # :nodoc: - GLOBALID_KEY = "_aj_globalid".freeze + GLOBALID_KEY = "_aj_globalid" # :nodoc: - SYMBOL_KEYS_KEY = "_aj_symbol_keys".freeze + SYMBOL_KEYS_KEY = "_aj_symbol_keys" # :nodoc: - WITH_INDIFFERENT_ACCESS_KEY = "_aj_hash_with_indifferent_access".freeze + WITH_INDIFFERENT_ACCESS_KEY = "_aj_hash_with_indifferent_access" # :nodoc: OBJECT_SERIALIZER_KEY = "_aj_serialized" @@ -60,11 +62,11 @@ module ActiveJob OBJECT_SERIALIZER_KEY, OBJECT_SERIALIZER_KEY.to_sym, WITH_INDIFFERENT_ACCESS_KEY, WITH_INDIFFERENT_ACCESS_KEY.to_sym, ] - private_constant :RESERVED_KEYS + private_constant :RESERVED_KEYS, :GLOBALID_KEY, :SYMBOL_KEYS_KEY, :WITH_INDIFFERENT_ACCESS_KEY def serialize_argument(argument) case argument - when *TYPE_WHITELIST + when *PERMITTED_TYPES argument when GlobalID::Identification convert_to_global_id_hash(argument) @@ -88,7 +90,7 @@ module ActiveJob case argument when String GlobalID::Locator.locate(argument) || argument - when *TYPE_WHITELIST + when *PERMITTED_TYPES argument when Array argument.map { |arg| deserialize_argument(arg) } @@ -145,7 +147,10 @@ module ActiveJob end def transform_symbol_keys(hash, symbol_keys) - hash.transform_keys do |key| + # NOTE: HashWithIndifferentAccess#transform_keys always + # returns stringified keys with indifferent access + # so we call #to_h here to ensure keys are symbolized. + hash.to_h.transform_keys do |key| if symbol_keys.include?(key) key.to_sym else diff --git a/activejob/lib/active_job/core.rb b/activejob/lib/active_job/core.rb index 61d402cfca..62bb5861bb 100644 --- a/activejob/lib/active_job/core.rb +++ b/activejob/lib/active_job/core.rb @@ -6,35 +6,33 @@ module ActiveJob module Core extend ActiveSupport::Concern - included do - # Job arguments - attr_accessor :arguments - attr_writer :serialized_arguments + # Job arguments + attr_accessor :arguments + attr_writer :serialized_arguments - # Timestamp when the job should be performed - attr_accessor :scheduled_at + # Timestamp when the job should be performed + attr_accessor :scheduled_at - # Job Identifier - attr_accessor :job_id + # Job Identifier + attr_accessor :job_id - # Queue in which the job will reside. - attr_writer :queue_name + # Queue in which the job will reside. + attr_writer :queue_name - # Priority that the job will have (lower is more priority). - attr_writer :priority + # Priority that the job will have (lower is more priority). + attr_writer :priority - # ID optionally provided by adapter - attr_accessor :provider_job_id + # ID optionally provided by adapter + attr_accessor :provider_job_id - # Number of times this job has been executed (which increments on every retry, like after an exception). - attr_accessor :executions + # Number of times this job has been executed (which increments on every retry, like after an exception). + attr_accessor :executions - # I18n.locale to be used during the job. - attr_accessor :locale + # I18n.locale to be used during the job. + attr_accessor :locale - # Timezone to be used during the job. - attr_accessor :timezone - end + # Timezone to be used during the job. + attr_accessor :timezone # These methods will be included into any Active Job object, adding # helpers for de/serialization and creation of job instances. diff --git a/activejob/lib/active_job/exceptions.rb b/activejob/lib/active_job/exceptions.rb index 1e57dbcb1c..bc9e168971 100644 --- a/activejob/lib/active_job/exceptions.rb +++ b/activejob/lib/active_job/exceptions.rb @@ -9,6 +9,7 @@ module ActiveJob module ClassMethods # Catch the exception and reschedule job for re-execution after so many seconds, for a specific number of attempts. + # The number of attempts includes the total executions of a job, not just the retried executions. # If the exception keeps getting raised beyond the specified number of attempts, the exception is allowed to # bubble up to the underlying queuing system, which may have its own retry mechanism or place it in a # holding queue for inspection. @@ -21,7 +22,8 @@ module ActiveJob # as a computing proc that the number of executions so far as an argument, or as a symbol reference of # <tt>:exponentially_longer</tt>, which applies the wait algorithm of <tt>(executions ** 4) + 2</tt> # (first wait 3s, then 18s, then 83s, etc) - # * <tt>:attempts</tt> - Re-enqueues the job the specified number of times (default: 5 attempts) + # * <tt>:attempts</tt> - Re-enqueues the job the specified number of times (default: 5 attempts), + # attempts here refers to the total number of times the job is executed, not just retried executions # * <tt>:queue</tt> - Re-enqueues the job on a different queue # * <tt>:priority</tt> - Re-enqueues the job with a different priority # @@ -45,13 +47,14 @@ module ActiveJob def retry_on(*exceptions, wait: 3.seconds, attempts: 5, queue: nil, priority: nil) rescue_from(*exceptions) do |error| if executions < attempts - logger.error "Retrying #{self.class} in #{wait} seconds, due to a #{error.class}. The original exception was #{error.cause.inspect}." - retry_job wait: determine_delay(wait), queue: queue, priority: priority + retry_job wait: determine_delay(wait), queue: queue, priority: priority, error: error else if block_given? - yield self, error + instrument :retry_stopped, error: error do + yield self, error + end else - logger.error "Stopped retrying #{self.class} due to a #{error.class}, which reoccurred on #{executions} attempts. The original exception was #{error.cause.inspect}." + instrument :retry_stopped, error: error raise error end end @@ -78,10 +81,8 @@ module ActiveJob # end def discard_on(*exceptions) rescue_from(*exceptions) do |error| - if block_given? - yield self, error - else - logger.error "Discarded #{self.class} due to a #{error.class}. The original exception was #{error.cause.inspect}." + instrument :discard, error: error do + yield self, error if block_given? end end end @@ -109,7 +110,9 @@ module ActiveJob # end # end def retry_job(options = {}) - enqueue options + instrument :enqueue_retry, options.slice(:error, :wait) do + enqueue options + end end private @@ -130,5 +133,11 @@ module ActiveJob raise "Couldn't determine a delay based on #{seconds_or_duration_or_algorithm.inspect}" end end + + def instrument(name, error: nil, wait: nil, &block) + payload = { job: self, adapter: self.class.queue_adapter, error: error, wait: wait } + + ActiveSupport::Notifications.instrument("#{name}.active_job", payload, &block) + end end end diff --git a/activejob/lib/active_job/execution.rb b/activejob/lib/active_job/execution.rb index d75be376ec..f5a343311f 100644 --- a/activejob/lib/active_job/execution.rb +++ b/activejob/lib/active_job/execution.rb @@ -31,11 +31,11 @@ module ActiveJob # # MyJob.new(*args).perform_now def perform_now + # Guard against jobs that were persisted before we started counting executions by zeroing out nil counters + self.executions = (executions || 0) + 1 + deserialize_arguments_if_needed run_callbacks :perform do - # Guard against jobs that were persisted before we started counting executions by zeroing out nil counters - self.executions = (executions || 0) + 1 - perform(*arguments) end rescue => exception diff --git a/activejob/lib/active_job/logging.rb b/activejob/lib/active_job/logging.rb index 9ffd60ad53..416be83c24 100644 --- a/activejob/lib/active_job/logging.rb +++ b/activejob/lib/active_job/logging.rb @@ -88,6 +88,38 @@ module ActiveJob end end + def enqueue_retry(event) + job = event.payload[:job] + ex = event.payload[:error] + wait = event.payload[:wait] + + info do + if ex + "Retrying #{job.class} in #{wait.to_i} seconds, due to a #{ex.class}." + else + "Retrying #{job.class} in #{wait.to_i} seconds." + end + end + end + + def retry_stopped(event) + job = event.payload[:job] + ex = event.payload[:error] + + error do + "Stopped retrying #{job.class} due to a #{ex.class}, which reoccurred on #{job.executions} attempts." + end + end + + def discard(event) + job = event.payload[:job] + ex = event.payload[:error] + + error do + "Discarded #{job.class} due to a #{ex.class}." + end + end + private def queue_name(event) event.payload[:adapter].class.name.demodulize.remove("Adapter") + "(#{event.payload[:job].queue_name})" diff --git a/activejob/lib/active_job/queue_adapter.rb b/activejob/lib/active_job/queue_adapter.rb index 006a683b85..954bfd1dd1 100644 --- a/activejob/lib/active_job/queue_adapter.rb +++ b/activejob/lib/active_job/queue_adapter.rb @@ -22,6 +22,8 @@ module ActiveJob _queue_adapter end + # Returns string denoting the name of the configured queue adapter. + # By default returns +"async"+. def queue_adapter_name _queue_adapter_name end diff --git a/activejob/lib/active_job/queue_adapters.rb b/activejob/lib/active_job/queue_adapters.rb index 00c7b407b1..3e3a474fbb 100644 --- a/activejob/lib/active_job/queue_adapters.rb +++ b/activejob/lib/active_job/queue_adapters.rb @@ -121,7 +121,7 @@ module ActiveJob autoload :SuckerPunchAdapter autoload :TestAdapter - ADAPTER = "Adapter".freeze + ADAPTER = "Adapter" private_constant :ADAPTER class << self diff --git a/activejob/lib/active_job/queue_adapters/async_adapter.rb b/activejob/lib/active_job/queue_adapters/async_adapter.rb index ebf6f384e3..53a7e3d53e 100644 --- a/activejob/lib/active_job/queue_adapters/async_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/async_adapter.rb @@ -31,7 +31,7 @@ module ActiveJob # jobs. Since jobs share a single thread pool, long-running jobs will block # short-lived jobs. Fine for dev/test; bad for production. class AsyncAdapter - # See {Concurrent::ThreadPoolExecutor}[https://ruby-concurrency.github.io/concurrent-ruby/Concurrent/ThreadPoolExecutor.html] for executor options. + # See {Concurrent::ThreadPoolExecutor}[https://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/ThreadPoolExecutor.html] for executor options. def initialize(**executor_options) @scheduler = Scheduler.new(**executor_options) end diff --git a/activejob/lib/active_job/queue_adapters/backburner_adapter.rb b/activejob/lib/active_job/queue_adapters/backburner_adapter.rb index 0ba93c6e0b..7dc49310ac 100644 --- a/activejob/lib/active_job/queue_adapters/backburner_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/backburner_adapter.rb @@ -16,12 +16,12 @@ module ActiveJob # Rails.application.config.active_job.queue_adapter = :backburner class BackburnerAdapter def enqueue(job) #:nodoc: - Backburner::Worker.enqueue JobWrapper, [ job.serialize ], queue: job.queue_name + Backburner::Worker.enqueue(JobWrapper, [job.serialize], queue: job.queue_name, pri: job.priority) end def enqueue_at(job, timestamp) #:nodoc: delay = timestamp - Time.current.to_f - Backburner::Worker.enqueue JobWrapper, [ job.serialize ], queue: job.queue_name, delay: delay + Backburner::Worker.enqueue(JobWrapper, [job.serialize], queue: job.queue_name, pri: job.priority, delay: delay) end class JobWrapper #:nodoc: diff --git a/activejob/lib/active_job/queue_adapters/test_adapter.rb b/activejob/lib/active_job/queue_adapters/test_adapter.rb index 885f9ff01c..f73ad444ba 100644 --- a/activejob/lib/active_job/queue_adapters/test_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/test_adapter.rb @@ -12,7 +12,7 @@ module ActiveJob # # Rails.application.config.active_job.queue_adapter = :test class TestAdapter - attr_accessor(:perform_enqueued_jobs, :perform_enqueued_at_jobs, :filter, :reject) + attr_accessor(:perform_enqueued_jobs, :perform_enqueued_at_jobs, :filter, :reject, :queue) attr_writer(:enqueued_jobs, :performed_jobs) # Provides a store of all the enqueued jobs with the TestAdapter so you can check them. @@ -29,14 +29,14 @@ module ActiveJob return if filtered?(job) job_data = job_to_hash(job) - enqueue_or_perform(perform_enqueued_jobs, job, job_data) + perform_or_enqueue(perform_enqueued_jobs, job, job_data) end def enqueue_at(job, timestamp) #:nodoc: return if filtered?(job) job_data = job_to_hash(job, at: timestamp) - enqueue_or_perform(perform_enqueued_at_jobs, job, job_data) + perform_or_enqueue(perform_enqueued_at_jobs, job, job_data) end private @@ -44,7 +44,7 @@ module ActiveJob { job: job.class, args: job.serialize.fetch("arguments"), queue: job.queue_name }.merge!(extras) end - def enqueue_or_perform(perform, job, job_data) + def perform_or_enqueue(perform, job, job_data) if perform performed_jobs << job_data Base.execute job.serialize @@ -54,12 +54,20 @@ module ActiveJob end def filtered?(job) + filtered_queue?(job) || filtered_job_class?(job) + end + + def filtered_queue?(job) + if queue + job.queue_name != queue.to_s + end + end + + def filtered_job_class?(job) if filter !Array(filter).include?(job.class) elsif reject Array(reject).include?(job.class) - else - false end end end diff --git a/activejob/lib/active_job/railtie.rb b/activejob/lib/active_job/railtie.rb index d0294854d3..ecc0908d5f 100644 --- a/activejob/lib/active_job/railtie.rb +++ b/activejob/lib/active_job/railtie.rb @@ -30,6 +30,10 @@ module ActiveJob send(k, v) if respond_to? k end end + + ActiveSupport.on_load(:action_dispatch_integration_test) do + include ActiveJob::TestHelper + end end initializer "active_job.set_reloader_hook" do |app| diff --git a/activejob/lib/active_job/test_helper.rb b/activejob/lib/active_job/test_helper.rb index 04cde28a96..0deb68d0d2 100644 --- a/activejob/lib/active_job/test_helper.rb +++ b/activejob/lib/active_job/test_helper.rb @@ -52,7 +52,7 @@ module ActiveJob queue_adapter_changed_jobs.each { |klass| klass.disable_test_adapter } end - # Specifies the queue adapter to use with all active job test helpers. + # Specifies the queue adapter to use with all Active Job test helpers. # # Returns an instance of the queue adapter and defaults to # <tt>ActiveJob::QueueAdapters::TestAdapter</tt>. @@ -75,7 +75,7 @@ module ActiveJob # assert_enqueued_jobs 2 # end # - # If a block is passed, that block will cause the specified number of + # If a block is passed, asserts that the block will cause the specified number of # jobs to be enqueued. # # def test_jobs_again @@ -89,7 +89,7 @@ module ActiveJob # end # end # - # The number of times a specific job was enqueued can be asserted. + # Asserts the number of times a specific job was enqueued by passing +:only+ option. # # def test_logging_job # assert_enqueued_jobs 1, only: LoggingJob do @@ -98,7 +98,7 @@ module ActiveJob # end # end # - # The number of times a job except specific class was enqueued can be asserted. + # Asserts the number of times a job except specific class was enqueued by passing +:except+ option. # # def test_logging_job # assert_enqueued_jobs 1, except: HelloJob do @@ -107,7 +107,7 @@ module ActiveJob # end # end # - # The number of times a job is enqueued to a specific queue can also be asserted. + # Asserts the number of times a job is enqueued to a specific queue by passing +:queue+ option. # # def test_logging_job # assert_enqueued_jobs 2, queue: 'default' do @@ -117,14 +117,18 @@ module ActiveJob # end def assert_enqueued_jobs(number, only: nil, except: nil, queue: nil) if block_given? - original_count = enqueued_jobs_size(only: only, except: except, queue: queue) + original_count = enqueued_jobs_with(only: only, except: except, queue: queue) + yield - new_count = enqueued_jobs_size(only: only, except: except, queue: queue) - assert_equal number, new_count - original_count, "#{number} jobs expected, but #{new_count - original_count} were enqueued" + + new_count = enqueued_jobs_with(only: only, except: except, queue: queue) + + actual_count = new_count - original_count else - actual_count = enqueued_jobs_size(only: only, except: except, queue: queue) - assert_equal number, actual_count, "#{number} jobs expected, but #{actual_count} were enqueued" + actual_count = enqueued_jobs_with(only: only, except: except, queue: queue) end + + assert_equal number, actual_count, "#{number} jobs expected, but #{actual_count} were enqueued" end # Asserts that no jobs have been enqueued. @@ -135,7 +139,7 @@ module ActiveJob # assert_enqueued_jobs 1 # end # - # If a block is passed, that block should not cause any job to be enqueued. + # If a block is passed, asserts that the block will not cause any job to be enqueued. # # def test_jobs_again # assert_no_enqueued_jobs do @@ -143,7 +147,7 @@ module ActiveJob # end # end # - # It can be asserted that no jobs of a specific kind are enqueued: + # Asserts that no jobs of a specific kind are enqueued by passing +:only+ option. # # def test_no_logging # assert_no_enqueued_jobs only: LoggingJob do @@ -151,7 +155,7 @@ module ActiveJob # end # end # - # It can be asserted that no jobs except specific class are enqueued: + # Asserts that no jobs except specific class are enqueued by passing +:except+ option. # # def test_no_logging # assert_no_enqueued_jobs except: HelloJob do @@ -159,7 +163,7 @@ module ActiveJob # end # end # - # It can be asserted that no jobs are enqueued to a specific queue: + # Asserts that no jobs are enqueued to a specific queue by passing +:queue+ option # # def test_no_logging # assert_no_enqueued_jobs queue: 'default' do @@ -176,7 +180,7 @@ module ActiveJob # Asserts that the number of performed jobs matches the given number. # If no block is passed, <tt>perform_enqueued_jobs</tt> - # must be called around the job call. + # must be called around or after the job call. # # def test_jobs # assert_performed_jobs 0 @@ -186,13 +190,14 @@ module ActiveJob # end # assert_performed_jobs 1 # - # perform_enqueued_jobs do - # HelloJob.perform_later('yves') - # assert_performed_jobs 2 - # end + # HelloJob.perform_later('yves') + # + # perform_enqueued_jobs + # + # assert_performed_jobs 2 # end # - # If a block is passed, that block should cause the specified number of + # If a block is passed, asserts that the block will cause the specified number of # jobs to be performed. # # def test_jobs_again @@ -206,7 +211,7 @@ module ActiveJob # end # end # - # The block form supports filtering. If the :only option is specified, + # This method also supports filtering. If the +:only+ option is specified, # then only the listed job(s) will be performed. # # def test_hello_job @@ -216,7 +221,7 @@ module ActiveJob # end # end # - # Also if the :except option is specified, + # Also if the +:except+ option is specified, # then the job(s) except specific class will be performed. # # def test_hello_job @@ -237,17 +242,30 @@ module ActiveJob # end # end # end - def assert_performed_jobs(number, only: nil, except: nil) + # + # If the +:queue+ option is specified, + # then only the job(s) enqueued to a specific queue will be performed. + # + # def test_assert_performed_jobs_with_queue_option + # assert_performed_jobs 1, queue: :some_queue do + # HelloJob.set(queue: :some_queue).perform_later("jeremy") + # HelloJob.set(queue: :other_queue).perform_later("bogdan") + # end + # end + def assert_performed_jobs(number, only: nil, except: nil, queue: nil, &block) if block_given? original_count = performed_jobs.size - perform_enqueued_jobs(only: only, except: except) { yield } + + perform_enqueued_jobs(only: only, except: except, queue: queue, &block) + new_count = performed_jobs.size - assert_equal number, new_count - original_count, - "#{number} jobs expected, but #{new_count - original_count} were performed" + + performed_jobs_size = new_count - original_count else - performed_jobs_size = performed_jobs.size - assert_equal number, performed_jobs_size, "#{number} jobs expected, but #{performed_jobs_size} were performed" + performed_jobs_size = performed_jobs_with(only: only, except: except, queue: queue) end + + assert_equal number, performed_jobs_size, "#{number} jobs expected, but #{performed_jobs_size} were performed" end # Asserts that no jobs have been performed. @@ -261,7 +279,7 @@ module ActiveJob # end # end # - # If a block is passed, that block should not cause any job to be performed. + # If a block is passed, asserts that the block will not cause any job to be performed. # # def test_jobs_again # assert_no_performed_jobs do @@ -269,7 +287,7 @@ module ActiveJob # end # end # - # The block form supports filtering. If the :only option is specified, + # The block form supports filtering. If the +:only+ option is specified, # then only the listed job(s) will not be performed. # # def test_no_logging @@ -278,7 +296,7 @@ module ActiveJob # end # end # - # Also if the :except option is specified, + # Also if the +:except+ option is specified, # then the job(s) except specific class will not be performed. # # def test_no_logging @@ -287,11 +305,20 @@ module ActiveJob # end # end # + # If the +:queue+ option is specified, + # then only the job(s) enqueued to a specific queue will not be performed. + # + # def test_assert_no_performed_jobs_with_queue_option + # assert_no_performed_jobs queue: :some_queue do + # HelloJob.set(queue: :other_queue).perform_later("jeremy") + # end + # end + # # Note: This assertion is simply a shortcut for: # # assert_performed_jobs 0, &block - def assert_no_performed_jobs(only: nil, except: nil, &block) - assert_performed_jobs 0, only: only, except: except, &block + def assert_no_performed_jobs(only: nil, except: nil, queue: nil, &block) + assert_performed_jobs 0, only: only, except: except, queue: queue, &block end # Asserts that the job has been enqueued with the given arguments. @@ -304,7 +331,23 @@ module ActiveJob # assert_enqueued_with(job: MyJob, at: Date.tomorrow.noon) # end # - # If a block is passed, that block should cause the job to be + # + # The +args+ argument also accepts a proc which will get passed the actual + # job's arguments. Your proc needs to returns a boolean value determining if + # the job's arguments matches your expectation. This is useful to check only + # for a subset of arguments. + # + # def test_assert_enqueued_with + # expected_args = ->(job_args) do + # assert job_args.first.key?(:foo) + # end + # + # MyJob.perform_later(foo: 'bar', other_arg: 'No need to check in the test') + # assert_enqueued_with(job: MyJob, args: expected_args, queue: 'low') + # end + # + # + # If a block is passed, asserts that the block will cause the job to be # enqueued with the given arguments. # # def test_assert_enqueued_with @@ -318,7 +361,7 @@ module ActiveJob # end def assert_enqueued_with(job: nil, args: nil, at: nil, queue: nil) expected = { job: job, args: args, at: at, queue: queue }.compact - serialized_args = serialize_args_for_assertion(expected) + expected_args = prepare_args_for_assertion(expected) if block_given? original_enqueued_jobs_count = enqueued_jobs.count @@ -331,14 +374,56 @@ module ActiveJob end matching_job = jobs.find do |enqueued_job| - serialized_args.all? { |key, value| value == enqueued_job[key] } + deserialized_job = deserialize_args_for_assertion(enqueued_job) + + expected_args.all? do |key, value| + if value.respond_to?(:call) + value.call(deserialized_job[key]) + else + value == deserialized_job[key] + end + end end assert matching_job, "No enqueued job found with #{expected}" instantiate_job(matching_job) end - # Asserts that the job passed in the block has been performed with the given arguments. + # Asserts that the job has been performed with the given arguments. + # + # def test_assert_performed_with + # MyJob.perform_later(1,2,3) + # + # perform_enqueued_jobs + # + # assert_performed_with(job: MyJob, args: [1,2,3], queue: 'high') + # + # MyJob.set(wait_until: Date.tomorrow.noon).perform_later + # + # perform_enqueued_jobs + # + # assert_performed_with(job: MyJob, at: Date.tomorrow.noon) + # end + # + # The +args+ argument also accepts a proc which will get passed the actual + # job's arguments. Your proc needs to returns a boolean value determining if + # the job's arguments matches your expectation. This is useful to check only + # for a subset of arguments. + # + # def test_assert_performed_with + # expected_args = ->(job_args) do + # assert job_args.first.key?(:foo) + # end + # MyJob.perform_later(foo: 'bar', other_arg: 'No need to check in the test') + # + # perform_enqueued_jobs + # + # assert_performed_with(job: MyJob, args: expected_args, queue: 'high') + # end + # + # If a block is passed, that block performs all of the jobs that were + # enqueued throughout the duration of the block and asserts that + # the job has been performed with the given arguments in the block. # # def test_assert_performed_with # assert_performed_with(job: MyJob, args: [1,2,3], queue: 'high') do @@ -349,20 +434,39 @@ module ActiveJob # MyJob.set(wait_until: Date.tomorrow.noon).perform_later # end # end - def assert_performed_with(job: nil, args: nil, at: nil, queue: nil) - original_performed_jobs_count = performed_jobs.count + def assert_performed_with(job: nil, args: nil, at: nil, queue: nil, &block) expected = { job: job, args: args, at: at, queue: queue }.compact - serialized_args = serialize_args_for_assertion(expected) - perform_enqueued_jobs { yield } - in_block_jobs = performed_jobs.drop(original_performed_jobs_count) - matching_job = in_block_jobs.find do |in_block_job| - serialized_args.all? { |key, value| value == in_block_job[key] } + expected_args = prepare_args_for_assertion(expected) + + if block_given? + original_performed_jobs_count = performed_jobs.count + + perform_enqueued_jobs(&block) + + jobs = performed_jobs.drop(original_performed_jobs_count) + else + jobs = performed_jobs end + + matching_job = jobs.find do |enqueued_job| + deserialized_job = deserialize_args_for_assertion(enqueued_job) + + expected_args.all? do |key, value| + if value.respond_to?(:call) + value.call(deserialized_job[key]) + else + value == deserialized_job[key] + end + end + end + assert matching_job, "No performed job found with #{expected}" instantiate_job(matching_job) end - # Performs all enqueued jobs in the duration of the block. + # Performs all enqueued jobs. If a block is given, performs all of the jobs + # that were enqueued throughout the duration of the block. If a block is + # not given, performs all of the enqueued jobs up to this point in the test. # # def test_perform_enqueued_jobs # perform_enqueued_jobs do @@ -371,6 +475,14 @@ module ActiveJob # assert_performed_jobs 1 # end # + # def test_perform_enqueued_jobs_without_block + # MyJob.perform_later(1, 2, 3) + # + # perform_enqueued_jobs + # + # assert_performed_jobs 1 + # end + # # This method also supports filtering. If the +:only+ option is specified, # then only the listed job(s) will be performed. # @@ -393,24 +505,42 @@ module ActiveJob # assert_performed_jobs 1 # end # - def perform_enqueued_jobs(only: nil, except: nil) + # If the +:queue+ option is specified, + # then only the job(s) enqueued to a specific queue will be performed. + # + # def test_perform_enqueued_jobs_with_queue + # perform_enqueued_jobs queue: :some_queue do + # MyJob.set(queue: :some_queue).perform_later(1, 2, 3) # will be performed + # HelloJob.set(queue: :other_queue).perform_later(1, 2, 3) # will not be performed + # end + # assert_performed_jobs 1 + # end + # + def perform_enqueued_jobs(only: nil, except: nil, queue: nil) + return flush_enqueued_jobs(only: only, except: except, queue: queue) unless block_given? + validate_option(only: only, except: except) + old_perform_enqueued_jobs = queue_adapter.perform_enqueued_jobs old_perform_enqueued_at_jobs = queue_adapter.perform_enqueued_at_jobs old_filter = queue_adapter.filter old_reject = queue_adapter.reject + old_queue = queue_adapter.queue begin queue_adapter.perform_enqueued_jobs = true queue_adapter.perform_enqueued_at_jobs = true queue_adapter.filter = only queue_adapter.reject = except + queue_adapter.queue = queue + yield ensure queue_adapter.perform_enqueued_jobs = old_perform_enqueued_jobs queue_adapter.perform_enqueued_at_jobs = old_perform_enqueued_at_jobs queue_adapter.filter = old_filter queue_adapter.reject = old_reject + queue_adapter.queue = old_queue end end @@ -432,31 +562,58 @@ module ActiveJob performed_jobs.clear end - def enqueued_jobs_size(only: nil, except: nil, queue: nil) + def jobs_with(jobs, only: nil, except: nil, queue: nil) validate_option(only: only, except: except) - enqueued_jobs.count do |job| + + jobs.count do |job| job_class = job.fetch(:job) + if only next false unless Array(only).include?(job_class) elsif except next false if Array(except).include?(job_class) end + if queue next false unless queue.to_s == job.fetch(:queue, job_class.queue_name) end + + yield job if block_given? + true end end - def serialize_args_for_assertion(args) - args.dup.tap do |serialized_args| - serialized_args[:args] = ActiveJob::Arguments.serialize(serialized_args[:args]) if serialized_args[:args] - serialized_args[:at] = serialized_args[:at].to_f if serialized_args[:at] + def enqueued_jobs_with(only: nil, except: nil, queue: nil, &block) + jobs_with(enqueued_jobs, only: only, except: except, queue: queue, &block) + end + + def performed_jobs_with(only: nil, except: nil, queue: nil, &block) + jobs_with(performed_jobs, only: only, except: except, queue: queue, &block) + end + + def flush_enqueued_jobs(only: nil, except: nil, queue: nil) + enqueued_jobs_with(only: only, except: except, queue: queue) do |payload| + instantiate_job(payload).perform_now + queue_adapter.performed_jobs << payload + end + end + + def prepare_args_for_assertion(args) + args.dup.tap do |arguments| + arguments[:at] = arguments[:at].to_f if arguments[:at] + end + end + + def deserialize_args_for_assertion(job) + job.dup.tap do |new_job| + new_job[:args] = ActiveJob::Arguments.deserialize(new_job[:args]) if new_job[:args] end end def instantiate_job(payload) - job = payload[:job].new(*payload[:args]) + args = ActiveJob::Arguments.deserialize(payload[:args]) + job = payload[:job].new(*args) job.scheduled_at = Time.at(payload[:at]) if payload.key?(:at) job.queue_name = payload[:queue] job |