diff options
Diffstat (limited to 'activejob/lib/active_job')
18 files changed, 391 insertions, 166 deletions
diff --git a/activejob/lib/active_job/arguments.rb b/activejob/lib/active_job/arguments.rb index 752be6898e..cdb37879b6 100644 --- a/activejob/lib/active_job/arguments.rb +++ b/activejob/lib/active_job/arguments.rb @@ -1,10 +1,12 @@ -require 'active_support/core_ext/hash/indifferent_access' +require 'active_support/core_ext/hash' module ActiveJob # Raised when an exception is raised during job arguments deserialization. # # Wraps the original exception raised as +original_exception+. class DeserializationError < StandardError + # The original exception that was raised during deserialization of job + # arguments. attr_reader :original_exception def initialize(e) #:nodoc: @@ -24,6 +26,7 @@ module ActiveJob module Arguments extend self + # :nodoc: TYPE_WHITELIST = [ NilClass, Fixnum, Float, String, TrueClass, FalseClass, Bignum ] # Serializes a set of arguments. Whitelisted types are returned @@ -43,8 +46,13 @@ module ActiveJob end private + # :nodoc: GLOBALID_KEY = '_aj_globalid'.freeze - private_constant :GLOBALID_KEY + # :nodoc: + SYMBOL_KEYS_KEY = '_aj_symbol_keys'.freeze + # :nodoc: + WITH_INDIFFERENT_ACCESS_KEY = '_aj_hash_with_indifferent_access'.freeze + private_constant :GLOBALID_KEY, :SYMBOL_KEYS_KEY, :WITH_INDIFFERENT_ACCESS_KEY def serialize_argument(argument) case argument @@ -54,10 +62,15 @@ module ActiveJob { GLOBALID_KEY => argument.to_global_id.to_s } when Array argument.map { |arg| serialize_argument(arg) } + when ActiveSupport::HashWithIndifferentAccess + result = serialize_hash(argument) + result[WITH_INDIFFERENT_ACCESS_KEY] = serialize_argument(true) + result when Hash - argument.each_with_object({}) do |(key, value), hash| - hash[serialize_hash_key(key)] = serialize_argument(value) - end + symbol_keys = argument.each_key.grep(Symbol).map(&:to_s) + result = serialize_hash(argument) + result[SYMBOL_KEYS_KEY] = symbol_keys + result else raise SerializationError.new("Unsupported argument type: #{argument.class.name}") end @@ -75,7 +88,7 @@ module ActiveJob if serialized_global_id?(argument) deserialize_global_id argument else - deserialize_hash argument + deserialize_hash(argument) end else raise ArgumentError, "Can only deserialize primitive arguments: #{argument.inspect}" @@ -90,13 +103,28 @@ module ActiveJob GlobalID::Locator.locate hash[GLOBALID_KEY] end + def serialize_hash(argument) + argument.each_with_object({}) do |(key, value), hash| + hash[serialize_hash_key(key)] = serialize_argument(value) + end + end + def deserialize_hash(serialized_hash) - serialized_hash.each_with_object({}.with_indifferent_access) do |(key, value), hash| - hash[key] = deserialize_argument(value) + result = serialized_hash.transform_values { |v| deserialize_argument(v) } + if result.delete(WITH_INDIFFERENT_ACCESS_KEY) + result = result.with_indifferent_access + elsif symbol_keys = result.delete(SYMBOL_KEYS_KEY) + result = transform_symbol_keys(result, symbol_keys) end + result end - RESERVED_KEYS = [GLOBALID_KEY, GLOBALID_KEY.to_sym] + # :nodoc: + RESERVED_KEYS = [ + GLOBALID_KEY, GLOBALID_KEY.to_sym, + SYMBOL_KEYS_KEY, SYMBOL_KEYS_KEY.to_sym, + WITH_INDIFFERENT_ACCESS_KEY, WITH_INDIFFERENT_ACCESS_KEY.to_sym, + ] private_constant :RESERVED_KEYS def serialize_hash_key(key) @@ -109,5 +137,15 @@ module ActiveJob raise SerializationError.new("Only string and symbol hash keys may be serialized as job arguments, but #{key.inspect} is a #{key.class}") end end + + def transform_symbol_keys(hash, symbol_keys) + hash.transform_keys do |key| + if symbol_keys.include?(key) + key.to_sym + else + key + end + end + end end end diff --git a/activejob/lib/active_job/core.rb b/activejob/lib/active_job/core.rb index ddd7d1361c..0528572cd0 100644 --- a/activejob/lib/active_job/core.rb +++ b/activejob/lib/active_job/core.rb @@ -1,4 +1,6 @@ module ActiveJob + # Provides general behavior that will be included into every Active Job + # object that inherits from ActiveJob::Base. module Core extend ActiveSupport::Concern @@ -15,6 +17,9 @@ module ActiveJob # Queue in which the job will reside. attr_writer :queue_name + + # ID optionally provided by adapter + attr_accessor :provider_job_id end # These methods will be included into any Active Job object, adding diff --git a/activejob/lib/active_job/enqueuing.rb b/activejob/lib/active_job/enqueuing.rb index 430c17e1bf..98d92385dd 100644 --- a/activejob/lib/active_job/enqueuing.rb +++ b/activejob/lib/active_job/enqueuing.rb @@ -1,6 +1,7 @@ require 'active_job/arguments' module ActiveJob + # Provides behavior for enqueuing and retrying jobs. module Enqueuing extend ActiveSupport::Concern diff --git a/activejob/lib/active_job/logging.rb b/activejob/lib/active_job/logging.rb index cd29e6908e..54774db601 100644 --- a/activejob/lib/active_job/logging.rb +++ b/activejob/lib/active_job/logging.rb @@ -81,7 +81,7 @@ module ActiveJob private def queue_name(event) - event.payload[:adapter].name.demodulize.remove('Adapter') + "(#{event.payload[:job].queue_name})" + event.payload[:adapter].class.name.demodulize.remove('Adapter') + "(#{event.payload[:job].queue_name})" end def args_info(job) diff --git a/activejob/lib/active_job/queue_adapter.rb b/activejob/lib/active_job/queue_adapter.rb index d610d30e01..457015b741 100644 --- a/activejob/lib/active_job/queue_adapter.rb +++ b/activejob/lib/active_job/queue_adapter.rb @@ -1,35 +1,63 @@ require 'active_job/queue_adapters/inline_adapter' +require 'active_support/core_ext/class/attribute' require 'active_support/core_ext/string/inflections' module ActiveJob # The <tt>ActiveJob::QueueAdapter</tt> module is used to load the - # correct adapter. The default queue adapter is the :inline queue. + # correct adapter. The default queue adapter is the +:inline+ queue. module QueueAdapter #:nodoc: extend ActiveSupport::Concern + included do + class_attribute :_queue_adapter, instance_accessor: false, instance_predicate: false + self.queue_adapter = :inline + end + # Includes the setter method for changing the active queue adapter. module ClassMethods - mattr_reader(:queue_adapter) { ActiveJob::QueueAdapters::InlineAdapter } + # Returns the backend queue provider. The default queue adapter + # is the +:inline+ queue. See QueueAdapters for more information. + def queue_adapter + _queue_adapter + end # Specify the backend queue provider. The default queue adapter - # is the :inline queue. See QueueAdapters for more + # is the +:inline+ queue. See QueueAdapters for more # information. - def queue_adapter=(name_or_adapter) - @@queue_adapter = \ - case name_or_adapter - when :test - ActiveJob::QueueAdapters::TestAdapter.new - when Symbol, String - load_adapter(name_or_adapter) - else - name_or_adapter if name_or_adapter.respond_to?(:enqueue) - end + def queue_adapter=(name_or_adapter_or_class) + self._queue_adapter = interpret_adapter(name_or_adapter_or_class) end private - def load_adapter(name) - "ActiveJob::QueueAdapters::#{name.to_s.camelize}Adapter".constantize + + def interpret_adapter(name_or_adapter_or_class) + case name_or_adapter_or_class + when Symbol, String + ActiveJob::QueueAdapters.lookup(name_or_adapter_or_class).new + else + if queue_adapter?(name_or_adapter_or_class) + name_or_adapter_or_class + elsif queue_adapter_class?(name_or_adapter_or_class) + ActiveSupport::Deprecation.warn "Passing an adapter class is deprecated " \ + "and will be removed in Rails 5.1. Please pass an adapter name " \ + "(.queue_adapter = :#{name_or_adapter_or_class.name.demodulize.remove('Adapter').underscore}) " \ + "or an instance (.queue_adapter = #{name_or_adapter_or_class.name}.new) instead." + name_or_adapter_or_class.new + else + raise ArgumentError + end end + end + + QUEUE_ADAPTER_METHODS = [:enqueue, :enqueue_at].freeze + + def queue_adapter?(object) + QUEUE_ADAPTER_METHODS.all? { |meth| object.respond_to?(meth) } + end + + def queue_adapter_class?(object) + object.is_a?(Class) && QUEUE_ADAPTER_METHODS.all? { |meth| object.public_method_defined?(meth) } + end end end end diff --git a/activejob/lib/active_job/queue_adapters.rb b/activejob/lib/active_job/queue_adapters.rb index 4b91c93dbe..1335e3236e 100644 --- a/activejob/lib/active_job/queue_adapters.rb +++ b/activejob/lib/active_job/queue_adapters.rb @@ -27,13 +27,76 @@ module ActiveJob # | Sneakers | Yes | Yes | No | Queue | Queue | No | # | Sucker Punch | Yes | Yes | No | No | No | No | # | Active Job Inline | No | Yes | N/A | N/A | N/A | N/A | - # | Active Job | Yes | Yes | Yes | No | No | No | + # + # ==== Async + # + # Yes: The Queue Adapter runs the jobs in a separate or forked process. + # + # No: The job is run in the same process. + # + # ==== Queues + # + # Yes: Jobs may set which queue they are run in with queue_as or by using the set + # method. + # + # ==== Delayed + # + # Yes: The adapter will run the job in the future through perform_later. + # + # (Gem): An additional gem is required to use perform_later with this adapter. + # + # No: The adapter will run jobs at the next opportunity and cannot use perform_later. + # + # N/A: The adapter does not support queueing. # # NOTE: - # queue_classic does not support Job scheduling. However you can implement this - # yourself or you can use the queue_classic-later gem. See the documentation for - # ActiveJob::QueueAdapters::QueueClassicAdapter. + # queue_classic does not support job scheduling. + # However, you can use the queue_classic-later gem. + # See the documentation for ActiveJob::QueueAdapters::QueueClassicAdapter. + # + # ==== Priorities + # + # The order in which jobs are processed can be configured differently depending + # on the adapter. + # + # Job: Any class inheriting from the adapter may set the priority on the job + # object relative to other jobs. + # + # Queue: The adapter can set the priority for job queues, when setting a queue + # with Active Job this will be respected. + # + # Yes: Allows the priority to be set on the job object, at the queue level or + # as default configuration option. # + # No: Does not allow the priority of jobs to be configured. + # + # N/A: The adapter does not support queueing, and therefore sorting them. + # + # ==== Timeout + # + # When a job will stop after the allotted time. + # + # Job: The timeout can be set for each instance of the job class. + # + # Queue: The timeout is set for all jobs on the queue. + # + # Global: The adapter is configured that all jobs have a maximum run time. + # + # N/A: This adapter does not run in a separate process, and therefore timeout + # is unsupported. + # + # ==== Retries + # + # Job: The number of retries can be set per instance of the job class. + # + # Yes: The Number of retries can be configured globally, for each instance or + # on the queue. This adapter may also present failed instances of the job class + # that can be restarted. + # + # Global: The adapter has a global number of retries. + # + # N/A: The adapter does not run in a separate process, and therefore doesn't + # support retries. module QueueAdapters extend ActiveSupport::Autoload @@ -48,5 +111,18 @@ module ActiveJob autoload :SneakersAdapter autoload :SuckerPunchAdapter autoload :TestAdapter + + ADAPTER = 'Adapter'.freeze + private_constant :ADAPTER + + class << self + # Returns adapter for specified name. + # + # ActiveJob::QueueAdapters.lookup(:sidekiq) + # # => ActiveJob::QueueAdapters::SidekiqAdapter + def lookup(name) + const_get(name.to_s.camelize << ADAPTER) + end + end end end diff --git a/activejob/lib/active_job/queue_adapters/backburner_adapter.rb b/activejob/lib/active_job/queue_adapters/backburner_adapter.rb index 2453d065de..17703e3e41 100644 --- a/activejob/lib/active_job/queue_adapters/backburner_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/backburner_adapter.rb @@ -13,15 +13,13 @@ module ActiveJob # # Rails.application.config.active_job.queue_adapter = :backburner class BackburnerAdapter - class << self - def enqueue(job) #:nodoc: - Backburner::Worker.enqueue JobWrapper, [ job.serialize ], queue: job.queue_name - end + def enqueue(job) #:nodoc: + Backburner::Worker.enqueue JobWrapper, [ job.serialize ], queue: job.queue_name + end - def enqueue_at(job, timestamp) #:nodoc: - delay = timestamp - Time.current.to_f - Backburner::Worker.enqueue JobWrapper, [ job.serialize ], queue: job.queue_name, delay: delay - end + def enqueue_at(job, timestamp) #:nodoc: + delay = timestamp - Time.current.to_f + Backburner::Worker.enqueue JobWrapper, [ job.serialize ], queue: job.queue_name, delay: delay end class JobWrapper #:nodoc: diff --git a/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb b/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb index 69d9e70de3..ac83da2b9c 100644 --- a/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb @@ -13,14 +13,16 @@ module ActiveJob # # Rails.application.config.active_job.queue_adapter = :delayed_job class DelayedJobAdapter - class << self - def enqueue(job) #:nodoc: - Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name) - end + def enqueue(job) #:nodoc: + delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name) + job.provider_job_id = delayed_job.id + delayed_job + end - def enqueue_at(job, timestamp) #:nodoc: - Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, run_at: Time.at(timestamp)) - end + def enqueue_at(job, timestamp) #:nodoc: + delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, run_at: Time.at(timestamp)) + job.provider_job_id = delayed_job.id + delayed_job end class JobWrapper #:nodoc: diff --git a/activejob/lib/active_job/queue_adapters/inline_adapter.rb b/activejob/lib/active_job/queue_adapters/inline_adapter.rb index e25d88e723..8ad5f4de07 100644 --- a/activejob/lib/active_job/queue_adapters/inline_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/inline_adapter.rb @@ -9,14 +9,12 @@ module ActiveJob # # Rails.application.config.active_job.queue_adapter = :inline class InlineAdapter - class << self - def enqueue(job) #:nodoc: - Base.execute(job.serialize) - end + def enqueue(job) #:nodoc: + Base.execute(job.serialize) + end - def enqueue_at(*) #:nodoc: - raise NotImplementedError.new("Use a queueing backend to enqueue jobs in the future. Read more at http://guides.rubyonrails.org/active_job_basics.html") - end + def enqueue_at(*) #:nodoc: + raise NotImplementedError, "Use a queueing backend to enqueue jobs in the future. Read more at http://guides.rubyonrails.org/active_job_basics.html" end end end diff --git a/activejob/lib/active_job/queue_adapters/qu_adapter.rb b/activejob/lib/active_job/queue_adapters/qu_adapter.rb index 30aa5a4670..0e198922fc 100644 --- a/activejob/lib/active_job/queue_adapters/qu_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/qu_adapter.rb @@ -16,16 +16,18 @@ module ActiveJob # # Rails.application.config.active_job.queue_adapter = :qu class QuAdapter - class << self - def enqueue(job, *args) #:nodoc: - Qu::Payload.new(klass: JobWrapper, args: [job.serialize]).tap do |payload| - payload.instance_variable_set(:@queue, job.queue_name) - end.push - end + def enqueue(job, *args) #:nodoc: + qu_job = Qu::Payload.new(klass: JobWrapper, args: [job.serialize]).tap do |payload| + payload.instance_variable_set(:@queue, job.queue_name) + end.push + + # qu_job can be nil depending on the configured backend + job.provider_job_id = qu_job.id unless qu_job.nil? + qu_job + end - def enqueue_at(job, timestamp, *args) #:nodoc: - raise NotImplementedError - end + def enqueue_at(job, timestamp, *args) #:nodoc: + raise NotImplementedError, "This queueing backend does not support scheduling jobs. To see what features are supported go to http://api.rubyonrails.org/classes/ActiveJob/QueueAdapters.html" end class JobWrapper < Qu::Job #:nodoc: diff --git a/activejob/lib/active_job/queue_adapters/que_adapter.rb b/activejob/lib/active_job/queue_adapters/que_adapter.rb index e501fe0368..90947aa98d 100644 --- a/activejob/lib/active_job/queue_adapters/que_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/que_adapter.rb @@ -15,14 +15,16 @@ module ActiveJob # # Rails.application.config.active_job.queue_adapter = :que class QueAdapter - class << self - def enqueue(job) #:nodoc: - JobWrapper.enqueue job.serialize, queue: job.queue_name - end + def enqueue(job) #:nodoc: + que_job = JobWrapper.enqueue job.serialize + job.provider_job_id = que_job.attrs["job_id"] + que_job + end - def enqueue_at(job, timestamp) #:nodoc: - JobWrapper.enqueue job.serialize, queue: job.queue_name, run_at: Time.at(timestamp) - end + def enqueue_at(job, timestamp) #:nodoc: + que_job = JobWrapper.enqueue job.serialize, run_at: Time.at(timestamp) + job.provider_job_id = que_job.attrs["job_id"] + que_job end class JobWrapper < Que::Job #:nodoc: diff --git a/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb b/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb index 34c11a68b2..059754a87f 100644 --- a/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb @@ -17,29 +17,27 @@ module ActiveJob # # Rails.application.config.active_job.queue_adapter = :queue_classic class QueueClassicAdapter - class << self - def enqueue(job) #:nodoc: - build_queue(job.queue_name).enqueue("#{JobWrapper.name}.perform", job.serialize) - end + def enqueue(job) #:nodoc: + build_queue(job.queue_name).enqueue("#{JobWrapper.name}.perform", job.serialize) + end - def enqueue_at(job, timestamp) #:nodoc: - queue = build_queue(job.queue_name) - unless queue.respond_to?(:enqueue_at) - raise NotImplementedError, 'To be able to schedule jobs with queue_classic ' \ - 'the QC::Queue needs to respond to `enqueue_at(timestamp, method, *args)`. ' \ - 'You can implement this yourself or you can use the queue_classic-later gem.' - end - queue.enqueue_at(timestamp, "#{JobWrapper.name}.perform", job.serialize) + def enqueue_at(job, timestamp) #:nodoc: + queue = build_queue(job.queue_name) + unless queue.respond_to?(:enqueue_at) + raise NotImplementedError, 'To be able to schedule jobs with queue_classic ' \ + 'the QC::Queue needs to respond to `enqueue_at(timestamp, method, *args)`. ' \ + 'You can implement this yourself or you can use the queue_classic-later gem.' end + queue.enqueue_at(timestamp, "#{JobWrapper.name}.perform", job.serialize) + end - # Builds a <tt>QC::Queue</tt> object to schedule jobs on. - # - # If you have a custom <tt>QC::Queue</tt> subclass you'll need to subclass - # <tt>ActiveJob::QueueAdapters::QueueClassicAdapter</tt> and override the - # <tt>build_queue</tt> method. - def build_queue(queue_name) - QC::Queue.new(queue_name) - end + # Builds a <tt>QC::Queue</tt> object to schedule jobs on. + # + # If you have a custom <tt>QC::Queue</tt> subclass you'll need to subclass + # <tt>ActiveJob::QueueAdapters::QueueClassicAdapter</tt> and override the + # <tt>build_queue</tt> method. + def build_queue(queue_name) + QC::Queue.new(queue_name) end class JobWrapper #:nodoc: diff --git a/activejob/lib/active_job/queue_adapters/resque_adapter.rb b/activejob/lib/active_job/queue_adapters/resque_adapter.rb index 88c6b48fef..417854afd8 100644 --- a/activejob/lib/active_job/queue_adapters/resque_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/resque_adapter.rb @@ -26,18 +26,16 @@ module ActiveJob # # Rails.application.config.active_job.queue_adapter = :resque class ResqueAdapter - class << self - def enqueue(job) #:nodoc: - Resque.enqueue_to job.queue_name, JobWrapper, job.serialize - end + def enqueue(job) #:nodoc: + Resque.enqueue_to job.queue_name, JobWrapper, job.serialize + end - def enqueue_at(job, timestamp) #:nodoc: - unless Resque.respond_to?(:enqueue_at_with_queue) - raise NotImplementedError, "To be able to schedule jobs with Resque you need the " \ - "resque-scheduler gem. Please add it to your Gemfile and run bundle install" - end - Resque.enqueue_at_with_queue job.queue_name, timestamp, JobWrapper, job.serialize + def enqueue_at(job, timestamp) #:nodoc: + unless Resque.respond_to?(:enqueue_at_with_queue) + raise NotImplementedError, "To be able to schedule jobs with Resque you need the " \ + "resque-scheduler gem. Please add it to your Gemfile and run bundle install" end + Resque.enqueue_at_with_queue job.queue_name, timestamp, JobWrapper, job.serialize end class JobWrapper #:nodoc: diff --git a/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb b/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb index 21005fc728..c321776bf5 100644 --- a/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb @@ -15,22 +15,22 @@ module ActiveJob # # Rails.application.config.active_job.queue_adapter = :sidekiq class SidekiqAdapter - class << self - def enqueue(job) #:nodoc: - #Sidekiq::Client does not support symbols as keys - Sidekiq::Client.push \ - 'class' => JobWrapper, - 'queue' => job.queue_name, - 'args' => [ job.serialize ] - end + def enqueue(job) #:nodoc: + #Sidekiq::Client does not support symbols as keys + job.provider_job_id = Sidekiq::Client.push \ + 'class' => JobWrapper, + 'wrapped' => job.class.to_s, + 'queue' => job.queue_name, + 'args' => [ job.serialize ] + end - def enqueue_at(job, timestamp) #:nodoc: - Sidekiq::Client.push \ - 'class' => JobWrapper, - 'queue' => job.queue_name, - 'args' => [ job.serialize ], - 'at' => timestamp - end + def enqueue_at(job, timestamp) #:nodoc: + job.provider_job_id = Sidekiq::Client.push \ + 'class' => JobWrapper, + 'wrapped' => job.class.to_s, + 'queue' => job.queue_name, + 'args' => [ job.serialize ], + 'at' => timestamp end class JobWrapper #:nodoc: diff --git a/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb b/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb index 6d60a2f303..f102c6567e 100644 --- a/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb @@ -16,19 +16,19 @@ module ActiveJob # # Rails.application.config.active_job.queue_adapter = :sneakers class SneakersAdapter - @monitor = Monitor.new + def initialize + @monitor = Monitor.new + end - class << self - def enqueue(job) #:nodoc: - @monitor.synchronize do - JobWrapper.from_queue job.queue_name - JobWrapper.enqueue ActiveSupport::JSON.encode(job.serialize) - end + def enqueue(job) #:nodoc: + @monitor.synchronize do + JobWrapper.from_queue job.queue_name + JobWrapper.enqueue ActiveSupport::JSON.encode(job.serialize) end + end - def enqueue_at(job, timestamp) #:nodoc: - raise NotImplementedError - end + def enqueue_at(job, timestamp) #:nodoc: + raise NotImplementedError, "This queueing backend does not support scheduling jobs. To see what features are supported go to http://api.rubyonrails.org/classes/ActiveJob/QueueAdapters.html" end class JobWrapper #:nodoc: diff --git a/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb b/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb index be9e7fd03a..c6c35f8ab4 100644 --- a/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb @@ -18,14 +18,12 @@ module ActiveJob # # Rails.application.config.active_job.queue_adapter = :sucker_punch class SuckerPunchAdapter - class << self - def enqueue(job) #:nodoc: - JobWrapper.new.async.perform job.serialize - end + def enqueue(job) #:nodoc: + JobWrapper.new.async.perform job.serialize + end - def enqueue_at(job, timestamp) #:nodoc: - raise NotImplementedError - end + def enqueue_at(job, timestamp) #:nodoc: + raise NotImplementedError, "This queueing backend does not support scheduling jobs. To see what features are supported go to http://api.rubyonrails.org/classes/ActiveJob/QueueAdapters.html" end class JobWrapper #:nodoc: diff --git a/activejob/lib/active_job/queue_adapters/test_adapter.rb b/activejob/lib/active_job/queue_adapters/test_adapter.rb index ea9df9a063..9b7b7139f4 100644 --- a/activejob/lib/active_job/queue_adapters/test_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/test_adapter.rb @@ -10,15 +10,9 @@ module ActiveJob # # Rails.application.config.active_job.queue_adapter = :test class TestAdapter - delegate :name, to: :class - attr_accessor(:perform_enqueued_jobs, :perform_enqueued_at_jobs) + attr_accessor(:perform_enqueued_jobs, :perform_enqueued_at_jobs, :filter) attr_writer(:enqueued_jobs, :performed_jobs) - def initialize - self.perform_enqueued_jobs = false - self.perform_enqueued_at_jobs = false - end - # Provides a store of all the enqueued jobs with the TestAdapter so you can check them. def enqueued_jobs @enqueued_jobs ||= [] @@ -30,22 +24,37 @@ module ActiveJob end def enqueue(job) #:nodoc: - if perform_enqueued_jobs - performed_jobs << {job: job.class, args: job.serialize['arguments'], queue: job.queue_name} - Base.execute job.serialize - else - enqueued_jobs << {job: job.class, args: job.serialize['arguments'], queue: job.queue_name} - end + return if filtered?(job) + + job_data = job_to_hash(job) + enqueue_or_perform(perform_enqueued_jobs, job, job_data) end def enqueue_at(job, timestamp) #:nodoc: - if perform_enqueued_at_jobs - performed_jobs << {job: job.class, args: job.serialize['arguments'], queue: job.queue_name, at: timestamp} + return if filtered?(job) + + job_data = job_to_hash(job, at: timestamp) + enqueue_or_perform(perform_enqueued_at_jobs, job, job_data) + end + + private + + def job_to_hash(job, extras = {}) + { job: job.class, args: job.serialize.fetch('arguments'), queue: job.queue_name }.merge!(extras) + end + + def enqueue_or_perform(perform, job, job_data) + if perform + performed_jobs << job_data Base.execute job.serialize else - enqueued_jobs << {job: job.class, args: job.serialize['arguments'], queue: job.queue_name, at: timestamp} + enqueued_jobs << job_data end end + + def filtered?(job) + filter && !Array(filter).include?(job.class) + end end end end diff --git a/activejob/lib/active_job/test_helper.rb b/activejob/lib/active_job/test_helper.rb index d18656e398..9b307e8dc8 100644 --- a/activejob/lib/active_job/test_helper.rb +++ b/activejob/lib/active_job/test_helper.rb @@ -1,3 +1,4 @@ +require 'active_support/core_ext/class/subclasses' require 'active_support/core_ext/hash/keys' module ActiveJob @@ -7,8 +8,17 @@ module ActiveJob included do def before_setup - @old_queue_adapter = queue_adapter - ActiveJob::Base.queue_adapter = :test + test_adapter = ActiveJob::QueueAdapters::TestAdapter.new + + @old_queue_adapters = (ActiveJob::Base.subclasses << ActiveJob::Base).select do |klass| + # only override explicitly set adapters, a quirk of `class_attribute` + klass.singleton_class.public_instance_methods(false).include?(:_queue_adapter) + end.map do |klass| + [klass, klass.queue_adapter].tap do + klass.queue_adapter = test_adapter + end + end + clear_enqueued_jobs clear_performed_jobs super @@ -16,7 +26,9 @@ module ActiveJob def after_teardown super - ActiveJob::Base.queue_adapter = @old_queue_adapter + @old_queue_adapters.each do |(klass, adapter)| + klass.queue_adapter = adapter + end end # Asserts that the number of enqueued jobs matches the given number. @@ -56,7 +68,7 @@ module ActiveJob original_count = enqueued_jobs_size(only: only) yield new_count = enqueued_jobs_size(only: only) - assert_equal original_count + number, new_count, "#{number} jobs expected, but #{new_count - original_count} were enqueued" + assert_equal number, new_count - original_count, "#{number} jobs expected, but #{new_count - original_count} were enqueued" else actual_count = enqueued_jobs_size(only: only) assert_equal number, actual_count, "#{number} jobs expected, but #{actual_count} were enqueued" @@ -125,12 +137,34 @@ module ActiveJob # HelloJob.perform_later('sean') # end # end - def assert_performed_jobs(number) + # + # The block form supports filtering. If the :only option is specified, + # then only the listed job(s) will be performed. + # + # def test_hello_job + # assert_performed_jobs 1, only: HelloJob do + # HelloJob.perform_later('jeremy') + # LoggingJob.perform_later + # end + # end + # + # An array may also be specified, to support testing multiple jobs. + # + # def test_hello_and_logging_jobs + # assert_nothing_raised do + # assert_performed_jobs 2, only: [HelloJob, LoggingJob] do + # HelloJob.perform_later('jeremy') + # LoggingJob.perform_later('stewie') + # RescueJob.perform_later('david') + # end + # end + # end + def assert_performed_jobs(number, only: nil) if block_given? original_count = performed_jobs.size - perform_enqueued_jobs { yield } + perform_enqueued_jobs(only: only) { yield } new_count = performed_jobs.size - assert_equal original_count + number, new_count, + assert_equal number, new_count - original_count, "#{number} jobs expected, but #{new_count - original_count} were performed" else performed_jobs_size = performed_jobs.size @@ -157,11 +191,33 @@ module ActiveJob # end # end # + # The block form supports filtering. If the :only option is specified, + # then only the listed job(s) will be performed. + # + # def test_hello_job + # assert_performed_jobs 1, only: HelloJob do + # HelloJob.perform_later('jeremy') + # LoggingJob.perform_later + # end + # end + # + # An array may also be specified, to support testing multiple jobs. + # + # def test_hello_and_logging_jobs + # assert_nothing_raised do + # assert_performed_jobs 2, only: [HelloJob, LoggingJob] do + # HelloJob.perform_later('jeremy') + # LoggingJob.perform_later('stewie') + # RescueJob.perform_later('david') + # end + # end + # end + # # Note: This assertion is simply a shortcut for: # # assert_performed_jobs 0, &block - def assert_no_performed_jobs(&block) - assert_performed_jobs 0, &block + def assert_no_performed_jobs(only: nil, &block) + assert_performed_jobs 0, only: only, &block end # Asserts that the job passed in the block has been enqueued with the given arguments. @@ -175,9 +231,10 @@ module ActiveJob original_enqueued_jobs = enqueued_jobs.dup clear_enqueued_jobs args.assert_valid_keys(:job, :args, :at, :queue) + serialized_args = serialize_args_for_assertion(args) yield matching_job = enqueued_jobs.any? do |job| - args.all? { |key, value| value == job[key] } + serialized_args.all? { |key, value| value == job[key] } end assert matching_job, "No enqueued job found with #{args}" ensure @@ -195,24 +252,31 @@ module ActiveJob original_performed_jobs = performed_jobs.dup clear_performed_jobs args.assert_valid_keys(:job, :args, :at, :queue) + serialized_args = serialize_args_for_assertion(args) perform_enqueued_jobs { yield } matching_job = performed_jobs.any? do |job| - args.all? { |key, value| value == job[key] } + serialized_args.all? { |key, value| value == job[key] } end assert matching_job, "No performed job found with #{args}" ensure queue_adapter.performed_jobs = original_performed_jobs + performed_jobs end - def perform_enqueued_jobs - @old_perform_enqueued_jobs = queue_adapter.perform_enqueued_jobs - @old_perform_enqueued_at_jobs = queue_adapter.perform_enqueued_at_jobs - queue_adapter.perform_enqueued_jobs = true - queue_adapter.perform_enqueued_at_jobs = true - yield - ensure - queue_adapter.perform_enqueued_jobs = @old_perform_enqueued_jobs - queue_adapter.perform_enqueued_at_jobs = @old_perform_enqueued_at_jobs + def perform_enqueued_jobs(only: nil) + old_perform_enqueued_jobs = queue_adapter.perform_enqueued_jobs + old_perform_enqueued_at_jobs = queue_adapter.perform_enqueued_at_jobs + old_filter = queue_adapter.filter + + begin + queue_adapter.perform_enqueued_jobs = true + queue_adapter.perform_enqueued_at_jobs = true + queue_adapter.filter = only + yield + ensure + queue_adapter.perform_enqueued_jobs = old_perform_enqueued_jobs + queue_adapter.perform_enqueued_at_jobs = old_perform_enqueued_at_jobs + queue_adapter.filter = old_filter + end end def queue_adapter @@ -234,11 +298,19 @@ module ActiveJob def enqueued_jobs_size(only: nil) if only - enqueued_jobs.select { |job| job[:job] == only }.size + enqueued_jobs.select { |job| job.fetch(:job) == only }.size else enqueued_jobs.size end end + + def serialize_args_for_assertion(args) + serialized_args = args.dup + if job_args = serialized_args.delete(:args) + serialized_args[:args] = ActiveJob::Arguments.serialize(job_args) + end + serialized_args + end end end end |