diff options
Diffstat (limited to 'activejob/lib')
-rw-r--r-- | activejob/lib/active_job.rb | 2 | ||||
-rw-r--r-- | activejob/lib/active_job/arguments.rb | 55 | ||||
-rw-r--r-- | activejob/lib/active_job/base.rb | 47 | ||||
-rw-r--r-- | activejob/lib/active_job/callbacks.rb | 6 | ||||
-rw-r--r-- | activejob/lib/active_job/core.rb | 41 | ||||
-rw-r--r-- | activejob/lib/active_job/enqueuing.rb | 6 | ||||
-rw-r--r-- | activejob/lib/active_job/execution.rb | 1 | ||||
-rw-r--r-- | activejob/lib/active_job/gem_version.rb | 6 | ||||
-rw-r--r-- | activejob/lib/active_job/logging.rb | 11 | ||||
-rw-r--r-- | activejob/lib/active_job/queue_adapter.rb | 14 | ||||
-rw-r--r-- | activejob/lib/active_job/queue_adapters.rb | 9 | ||||
-rw-r--r-- | activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb | 4 | ||||
-rw-r--r-- | activejob/lib/active_job/queue_adapters/inline_adapter.rb | 2 | ||||
-rw-r--r-- | activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb | 10 | ||||
-rw-r--r-- | activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb | 4 | ||||
-rw-r--r-- | activejob/lib/active_job/queue_adapters/test_adapter.rb | 42 | ||||
-rw-r--r-- | activejob/lib/active_job/queue_name.rb | 17 | ||||
-rw-r--r-- | activejob/lib/active_job/test_helper.rb | 153 |
18 files changed, 342 insertions, 88 deletions
diff --git a/activejob/lib/active_job.rb b/activejob/lib/active_job.rb index 1b582f5877..3d4f63b261 100644 --- a/activejob/lib/active_job.rb +++ b/activejob/lib/active_job.rb @@ -1,5 +1,5 @@ #-- -# Copyright (c) 2014 David Heinemeier Hansson +# Copyright (c) 2014-2015 David Heinemeier Hansson # # Permission is hereby granted, free of charge, to any person obtaining # a copy of this software and associated documentation files (the diff --git a/activejob/lib/active_job/arguments.rb b/activejob/lib/active_job/arguments.rb index 86b85ebdba..622c37098e 100644 --- a/activejob/lib/active_job/arguments.rb +++ b/activejob/lib/active_job/arguments.rb @@ -1,3 +1,5 @@ +require 'active_support/core_ext/hash' + module ActiveJob # Raised when an exception is raised during job arguments deserialization. # @@ -24,10 +26,16 @@ module ActiveJob extend self TYPE_WHITELIST = [ NilClass, Fixnum, Float, String, TrueClass, FalseClass, Bignum ] + # Serializes a set of arguments. Whitelisted types are returned + # as-is. Arrays/Hashes are serialized element by element. + # All other types are serialized using GlobalID. def serialize(arguments) arguments.map { |argument| serialize_argument(argument) } end + # Deserializes a set of arguments. Whitelisted types are returned + # as-is. Arrays/Hashes are deserialized element by element. + # All other types are deserialized using GlobalID. def deserialize(arguments) arguments.map { |argument| deserialize_argument(argument) } rescue => e @@ -36,7 +44,9 @@ module ActiveJob private GLOBALID_KEY = '_aj_globalid'.freeze - private_constant :GLOBALID_KEY + SYMBOL_KEYS_KEY = '_aj_symbol_keys'.freeze + WITH_INDIFFERENT_ACCESS_KEY = '_aj_hash_with_indifferent_access'.freeze + private_constant :GLOBALID_KEY, :SYMBOL_KEYS_KEY, :WITH_INDIFFERENT_ACCESS_KEY def serialize_argument(argument) case argument @@ -46,10 +56,15 @@ module ActiveJob { GLOBALID_KEY => argument.to_global_id.to_s } when Array argument.map { |arg| serialize_argument(arg) } + when ActiveSupport::HashWithIndifferentAccess + result = serialize_hash(argument) + result[WITH_INDIFFERENT_ACCESS_KEY] = serialize_argument(true) + result when Hash - argument.each_with_object({}) do |(key, value), hash| - hash[serialize_hash_key(key)] = serialize_argument(value) - end + symbol_keys = argument.each_key.grep(Symbol).map(&:to_s) + result = serialize_hash(argument) + result[SYMBOL_KEYS_KEY] = symbol_keys + result else raise SerializationError.new("Unsupported argument type: #{argument.class.name}") end @@ -67,7 +82,7 @@ module ActiveJob if serialized_global_id?(argument) deserialize_global_id argument else - deserialize_hash argument + deserialize_hash(argument) end else raise ArgumentError, "Can only deserialize primitive arguments: #{argument.inspect}" @@ -82,13 +97,27 @@ module ActiveJob GlobalID::Locator.locate hash[GLOBALID_KEY] end + def serialize_hash(argument) + argument.each_with_object({}) do |(key, value), hash| + hash[serialize_hash_key(key)] = serialize_argument(value) + end + end + def deserialize_hash(serialized_hash) - serialized_hash.each_with_object({}.with_indifferent_access) do |(key, value), hash| - hash[key] = deserialize_argument(value) + result = serialized_hash.transform_values { |v| deserialize_argument(v) } + if result.delete(WITH_INDIFFERENT_ACCESS_KEY) + result = result.with_indifferent_access + elsif symbol_keys = result.delete(SYMBOL_KEYS_KEY) + result = transform_symbol_keys(result, symbol_keys) end + result end - RESERVED_KEYS = [GLOBALID_KEY, GLOBALID_KEY.to_sym] + RESERVED_KEYS = [ + GLOBALID_KEY, GLOBALID_KEY.to_sym, + SYMBOL_KEYS_KEY, SYMBOL_KEYS_KEY.to_sym, + WITH_INDIFFERENT_ACCESS_KEY, WITH_INDIFFERENT_ACCESS_KEY.to_sym, + ] private_constant :RESERVED_KEYS def serialize_hash_key(key) @@ -101,5 +130,15 @@ module ActiveJob raise SerializationError.new("Only string and symbol hash keys may be serialized as job arguments, but #{key.inspect} is a #{key.class}") end end + + def transform_symbol_keys(hash, symbol_keys) + hash.transform_keys do |key| + if symbol_keys.include?(key) + key.to_sym + else + key + end + end + end end end diff --git a/activejob/lib/active_job/base.rb b/activejob/lib/active_job/base.rb index a3bec1f827..fd49b3fda5 100644 --- a/activejob/lib/active_job/base.rb +++ b/activejob/lib/active_job/base.rb @@ -6,7 +6,52 @@ require 'active_job/execution' require 'active_job/callbacks' require 'active_job/logging' -module ActiveJob +module ActiveJob #:nodoc: + # = Active Job + # + # Active Job objects can be configured to work with different backend + # queuing frameworks. To specify a queue adapter to use: + # + # ActiveJob::Base.queue_adapter = :inline + # + # A list of supported adapters can be found in QueueAdapters. + # + # Active Job objects can be defined by creating a class that inherits + # from the ActiveJob::Base class. The only necessary method to + # implement is the "perform" method. + # + # To define an Active Job object: + # + # class ProcessPhotoJob < ActiveJob::Base + # def perform(photo) + # photo.watermark!('Rails') + # photo.rotate!(90.degrees) + # photo.resize_to_fit!(300, 300) + # photo.upload! + # end + # end + # + # Records that are passed in are serialized/deserialized using Global + # ID. More information can be found in Arguments. + # + # To enqueue a job to be performed as soon the queueing system is free: + # + # ProcessPhotoJob.perform_later(photo) + # + # To enqueue a job to be processed at some point in the future: + # + # ProcessPhotoJob.set(wait_until: Date.tomorrow.noon).perform_later(photo) + # + # More information can be found in ActiveJob::Core::ClassMethods#set + # + # A job can also be processed immediately without sending to the queue: + # + # ProcessPhotoJob.perform_now(photo) + # + # == Exceptions + # + # * DeserializationError - Error class for deserialization errors. + # * SerializationError - Error class for serialization errors. class Base include Core include QueueAdapter diff --git a/activejob/lib/active_job/callbacks.rb b/activejob/lib/active_job/callbacks.rb index 29e2a878b4..2b6149e84e 100644 --- a/activejob/lib/active_job/callbacks.rb +++ b/activejob/lib/active_job/callbacks.rb @@ -3,8 +3,8 @@ require 'active_support/callbacks' module ActiveJob # = Active Job Callbacks # - # Active Job provides hooks during the lifecycle of a job. Callbacks allow you - # to trigger logic during the lifecycle of a job. Available callbacks are: + # Active Job provides hooks during the life cycle of a job. Callbacks allow you + # to trigger logic during the life cycle of a job. Available callbacks are: # # * <tt>before_enqueue</tt> # * <tt>around_enqueue</tt> @@ -22,6 +22,8 @@ module ActiveJob define_callbacks :enqueue end + # These methods will be included into any Active Job object, adding + # callbacks for +perform+ and +enqueue+ methods. module ClassMethods # Defines a callback that will get called right before the # job's perform method is executed. diff --git a/activejob/lib/active_job/core.rb b/activejob/lib/active_job/core.rb index f55db5a588..ddd7d1361c 100644 --- a/activejob/lib/active_job/core.rb +++ b/activejob/lib/active_job/core.rb @@ -17,13 +17,13 @@ module ActiveJob attr_writer :queue_name end + # These methods will be included into any Active Job object, adding + # helpers for de/serialization and creation of job instances. module ClassMethods # Creates a new job instance from a hash created with +serialize+ def deserialize(job_data) - job = job_data['job_class'].constantize.new - job.job_id = job_data['job_id'] - job.queue_name = job_data['queue_name'] - job.serialized_arguments = job_data['arguments'] + job = job_data['job_class'].constantize.new + job.deserialize(job_data) job end @@ -48,8 +48,8 @@ module ActiveJob end end - # Creates a new job instance. Takes as arguments the arguments that - # will be passed to the perform method. + # Creates a new job instance. Takes the arguments that will be + # passed to the perform method. def initialize(*arguments) @arguments = arguments @job_id = SecureRandom.uuid @@ -67,6 +67,32 @@ module ActiveJob } end + # Attaches the stored job data to the current instance. Receives a hash + # returned from +serialize+ + # + # ==== Examples + # + # class DeliverWebhookJob < ActiveJob::Base + # def serialize + # super.merge('attempt_number' => (@attempt_number || 0) + 1) + # end + # + # def deserialize(job_data) + # super + # @attempt_number = job_data['attempt_number'] + # end + # + # rescue_from(TimeoutError) do |exception| + # raise exception if @attempt_number > 5 + # retry_job(wait: 10) + # end + # end + def deserialize(job_data) + self.job_id = job_data['job_id'] + self.queue_name = job_data['queue_name'] + self.serialized_arguments = job_data['arguments'] + end + private def deserialize_arguments_if_needed if defined?(@serialized_arguments) && @serialized_arguments.present? @@ -84,6 +110,3 @@ module ActiveJob end end end - - - diff --git a/activejob/lib/active_job/enqueuing.rb b/activejob/lib/active_job/enqueuing.rb index 74bcc1fa5d..430c17e1bf 100644 --- a/activejob/lib/active_job/enqueuing.rb +++ b/activejob/lib/active_job/enqueuing.rb @@ -4,13 +4,14 @@ module ActiveJob module Enqueuing extend ActiveSupport::Concern + # Includes the +perform_later+ method for job initialization. module ClassMethods # Push a job onto the queue. The arguments must be legal JSON types # (string, int, float, nil, true, false, hash or array) or # GlobalID::Identification instances. Arbitrary Ruby objects # are not supported. # - # Returns an instance of the job class queued with args available in + # Returns an instance of the job class queued with arguments available in # Job#arguments. def perform_later(*args) job_or_instantiate(*args).enqueue @@ -22,7 +23,7 @@ module ActiveJob end end - # Reschedule the job to be re-executed. This is useful in combination + # Reschedules the job to be re-executed. This is useful in combination # with the +rescue_from+ option. When you rescue an exception from your job # you can ask Active Job to retry performing your job. # @@ -37,6 +38,7 @@ module ActiveJob # rescue_from(ErrorLoadingSite) do # retry_job queue: :low_priority # end + # # def perform(*args) # # raise ErrorLoadingSite if cannot scrape # end diff --git a/activejob/lib/active_job/execution.rb b/activejob/lib/active_job/execution.rb index 7ff857206d..79d232da4a 100644 --- a/activejob/lib/active_job/execution.rb +++ b/activejob/lib/active_job/execution.rb @@ -6,6 +6,7 @@ module ActiveJob extend ActiveSupport::Concern include ActiveSupport::Rescuable + # Includes methods for executing and performing jobs instantly. module ClassMethods # Performs the job immediately. # diff --git a/activejob/lib/active_job/gem_version.rb b/activejob/lib/active_job/gem_version.rb index f80e6563b8..27a5de93f4 100644 --- a/activejob/lib/active_job/gem_version.rb +++ b/activejob/lib/active_job/gem_version.rb @@ -5,10 +5,10 @@ module ActiveJob end module VERSION - MAJOR = 4 - MINOR = 2 + MAJOR = 5 + MINOR = 0 TINY = 0 - PRE = "beta2" + PRE = "alpha" STRING = [MAJOR, MINOR, TINY, PRE].compact.join(".") end diff --git a/activejob/lib/active_job/logging.rb b/activejob/lib/active_job/logging.rb index bb96668cfb..cd29e6908e 100644 --- a/activejob/lib/active_job/logging.rb +++ b/activejob/lib/active_job/logging.rb @@ -3,7 +3,7 @@ require 'active_support/tagged_logging' require 'active_support/logger' module ActiveJob - module Logging + module Logging #:nodoc: extend ActiveSupport::Concern included do @@ -75,7 +75,7 @@ module ActiveJob def perform(event) info do job = event.payload[:job] - "Performed #{job.class.name} from #{queue_name(event)} in #{event.duration.round(2).to_s}ms" + "Performed #{job.class.name} from #{queue_name(event)} in #{event.duration.round(2)}ms" end end @@ -85,7 +85,12 @@ module ActiveJob end def args_info(job) - job.arguments.any? ? " with arguments: #{job.arguments.map(&:inspect).join(", ")}" : "" + if job.arguments.any? + ' with arguments: ' + + job.arguments.map { |arg| arg.try(:to_global_id).try(:to_s) || arg.inspect }.join(', ') + else + '' + end end def scheduled_at(event) diff --git a/activejob/lib/active_job/queue_adapter.rb b/activejob/lib/active_job/queue_adapter.rb index fb54aec75e..d610d30e01 100644 --- a/activejob/lib/active_job/queue_adapter.rb +++ b/activejob/lib/active_job/queue_adapter.rb @@ -2,12 +2,18 @@ require 'active_job/queue_adapters/inline_adapter' require 'active_support/core_ext/string/inflections' module ActiveJob - module QueueAdapter + # The <tt>ActiveJob::QueueAdapter</tt> module is used to load the + # correct adapter. The default queue adapter is the :inline queue. + module QueueAdapter #:nodoc: extend ActiveSupport::Concern + # Includes the setter method for changing the active queue adapter. module ClassMethods mattr_reader(:queue_adapter) { ActiveJob::QueueAdapters::InlineAdapter } + # Specify the backend queue provider. The default queue adapter + # is the :inline queue. See QueueAdapters for more + # information. def queue_adapter=(name_or_adapter) @@queue_adapter = \ case name_or_adapter @@ -15,8 +21,8 @@ module ActiveJob ActiveJob::QueueAdapters::TestAdapter.new when Symbol, String load_adapter(name_or_adapter) - when Class - name_or_adapter + else + name_or_adapter if name_or_adapter.respond_to?(:enqueue) end end @@ -26,4 +32,4 @@ module ActiveJob end end end -end
\ No newline at end of file +end diff --git a/activejob/lib/active_job/queue_adapters.rb b/activejob/lib/active_job/queue_adapters.rb index e865c901ce..4b91c93dbe 100644 --- a/activejob/lib/active_job/queue_adapters.rb +++ b/activejob/lib/active_job/queue_adapters.rb @@ -7,20 +7,21 @@ module ActiveJob # * {Delayed Job}[https://github.com/collectiveidea/delayed_job] # * {Qu}[https://github.com/bkeepers/qu] # * {Que}[https://github.com/chanks/que] - # * {QueueClassic 2.x}[https://github.com/ryandotsmith/queue_classic/tree/v2.2.3] + # * {queue_classic}[https://github.com/QueueClassic/queue_classic] # * {Resque 1.x}[https://github.com/resque/resque/tree/1-x-stable] # * {Sidekiq}[http://sidekiq.org] # * {Sneakers}[https://github.com/jondot/sneakers] # * {Sucker Punch}[https://github.com/brandonhilkert/sucker_punch] # - # #### Backends Features + # === Backends Features # # | | Async | Queues | Delayed | Priorities | Timeout | Retries | # |-------------------|-------|--------|-----------|------------|---------|---------| # | Backburner | Yes | Yes | Yes | Yes | Job | Global | # | Delayed Job | Yes | Yes | Yes | Job | Global | Global | + # | Qu | Yes | Yes | No | No | No | Global | # | Que | Yes | Yes | Yes | Job | No | Job | - # | Queue Classic | Yes | Yes | No* | No | No | No | + # | queue_classic | Yes | Yes | No* | No | No | No | # | Resque | Yes | Yes | Yes (Gem) | Queue | Global | Yes | # | Sidekiq | Yes | Yes | Yes | Queue | No | Job | # | Sneakers | Yes | Yes | No | Queue | Queue | No | @@ -29,7 +30,7 @@ module ActiveJob # | Active Job | Yes | Yes | Yes | No | No | No | # # NOTE: - # Queue Classic does not support Job scheduling. However you can implement this + # queue_classic does not support Job scheduling. However you can implement this # yourself or you can use the queue_classic-later gem. See the documentation for # ActiveJob::QueueAdapters::QueueClassicAdapter. # diff --git a/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb b/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb index 4d27c4fff8..69d9e70de3 100644 --- a/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb @@ -6,10 +6,10 @@ module ActiveJob # # Delayed::Job (or DJ) encapsulates the common pattern of asynchronously # executing longer tasks in the background. Although DJ can have many - # storage backends one of the most used is based on Active Record. + # storage backends, one of the most used is based on Active Record. # Read more about Delayed Job {here}[https://github.com/collectiveidea/delayed_job]. # - # To use Delayed Job set the queue_adapter config to +:delayed_job+. + # To use Delayed Job, set the queue_adapter config to +:delayed_job+. # # Rails.application.config.active_job.queue_adapter = :delayed_job class DelayedJobAdapter diff --git a/activejob/lib/active_job/queue_adapters/inline_adapter.rb b/activejob/lib/active_job/queue_adapters/inline_adapter.rb index e498454909..e25d88e723 100644 --- a/activejob/lib/active_job/queue_adapters/inline_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/inline_adapter.rb @@ -15,7 +15,7 @@ module ActiveJob end def enqueue_at(*) #:nodoc: - raise NotImplementedError.new("Use a queueing backend to enqueue jobs in the future. Read more at https://github.com/rails/activejob") + raise NotImplementedError.new("Use a queueing backend to enqueue jobs in the future. Read more at http://guides.rubyonrails.org/active_job_basics.html") end end end diff --git a/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb b/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb index f160932578..34c11a68b2 100644 --- a/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb @@ -2,7 +2,7 @@ require 'queue_classic' module ActiveJob module QueueAdapters - # == Queue Classic adapter for Active Job + # == queue_classic adapter for Active Job # # queue_classic provides a simple interface to a PostgreSQL-backed message # queue. queue_classic specializes in concurrent locking and minimizing @@ -11,9 +11,9 @@ module ActiveJob # production environment and that adding another dependency (e.g. redis, # beanstalkd, 0mq) is undesirable. # - # Read more about Queue Classic {here}[https://github.com/ryandotsmith/queue_classic]. + # Read more about queue_classic {here}[https://github.com/QueueClassic/queue_classic]. # - # To use Queue Classic set the queue_adapter config to +:queue_classic+. + # To use queue_classic set the queue_adapter config to +:queue_classic+. # # Rails.application.config.active_job.queue_adapter = :queue_classic class QueueClassicAdapter @@ -25,8 +25,8 @@ module ActiveJob def enqueue_at(job, timestamp) #:nodoc: queue = build_queue(job.queue_name) unless queue.respond_to?(:enqueue_at) - raise NotImplementedError, 'To be able to schedule jobs with Queue Classic ' \ - 'the QC::Queue needs to respond to `enqueue_at(timestamp, method, *args)`. ' + raise NotImplementedError, 'To be able to schedule jobs with queue_classic ' \ + 'the QC::Queue needs to respond to `enqueue_at(timestamp, method, *args)`. ' \ 'You can implement this yourself or you can use the queue_classic-later gem.' end queue.enqueue_at(timestamp, "#{JobWrapper.name}.perform", job.serialize) diff --git a/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb b/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb index 7d80a6fd7a..21005fc728 100644 --- a/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb @@ -21,8 +21,7 @@ module ActiveJob Sidekiq::Client.push \ 'class' => JobWrapper, 'queue' => job.queue_name, - 'args' => [ job.serialize ], - 'retry' => true + 'args' => [ job.serialize ] end def enqueue_at(job, timestamp) #:nodoc: @@ -30,7 +29,6 @@ module ActiveJob 'class' => JobWrapper, 'queue' => job.queue_name, 'args' => [ job.serialize ], - 'retry' => true, 'at' => timestamp end end diff --git a/activejob/lib/active_job/queue_adapters/test_adapter.rb b/activejob/lib/active_job/queue_adapters/test_adapter.rb index e4fdf60008..c9e2bdca27 100644 --- a/activejob/lib/active_job/queue_adapters/test_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/test_adapter.rb @@ -11,9 +11,14 @@ module ActiveJob # Rails.application.config.active_job.queue_adapter = :test class TestAdapter delegate :name, to: :class - attr_accessor(:perform_enqueued_jobs, :perform_enqueued_at_jobs) + attr_accessor(:perform_enqueued_jobs, :perform_enqueued_at_jobs, :filter) attr_writer(:enqueued_jobs, :performed_jobs) + def initialize + self.perform_enqueued_jobs = false + self.perform_enqueued_at_jobs = false + end + # Provides a store of all the enqueued jobs with the TestAdapter so you can check them. def enqueued_jobs @enqueued_jobs ||= [] @@ -25,22 +30,33 @@ module ActiveJob end def enqueue(job) #:nodoc: - if perform_enqueued_jobs - performed_jobs << {job: job.class, args: job.arguments, queue: job.queue_name} - job.perform_now - else - enqueued_jobs << {job: job.class, args: job.arguments, queue: job.queue_name} - end + return if filtered?(job) + + job_data = { job: job.class, args: job.serialize['arguments'], queue: job.queue_name } + enqueue_or_perform(perform_enqueued_jobs, job, job_data) end def enqueue_at(job, timestamp) #:nodoc: - if perform_enqueued_at_jobs - performed_jobs << {job: job.class, args: job.arguments, queue: job.queue_name, at: timestamp} - job.perform_now - else - enqueued_jobs << {job: job.class, args: job.arguments, queue: job.queue_name, at: timestamp} - end + return if filtered?(job) + + job_data = { job: job.class, args: job.serialize['arguments'], queue: job.queue_name, at: timestamp } + enqueue_or_perform(perform_enqueued_at_jobs, job, job_data) end + + private + + def enqueue_or_perform(perform, job, job_data) + if perform + performed_jobs << job_data + Base.execute job.serialize + else + enqueued_jobs << job_data + end + end + + def filtered?(job) + filter && !Array(filter).include?(job.class) + end end end end diff --git a/activejob/lib/active_job/queue_name.rb b/activejob/lib/active_job/queue_name.rb index 45acb71605..9ae0345120 100644 --- a/activejob/lib/active_job/queue_name.rb +++ b/activejob/lib/active_job/queue_name.rb @@ -2,10 +2,20 @@ module ActiveJob module QueueName extend ActiveSupport::Concern + # Includes the ability to override the default queue name and prefix. module ClassMethods mattr_accessor(:queue_name_prefix) mattr_accessor(:default_queue_name) { "default" } + # Specifies the name of the queue to process the job on. + # + # class PublishToFeedJob < ActiveJob::Base + # queue_as :feeds + # + # def perform(post) + # post.to_feed! + # end + # end def queue_as(part_name=nil, &block) if block_given? self.queue_name = block @@ -15,15 +25,18 @@ module ActiveJob end def queue_name_from_part(part_name) #:nodoc: - queue_name = part_name.to_s.presence || default_queue_name + queue_name = part_name || default_queue_name name_parts = [queue_name_prefix.presence, queue_name] - name_parts.compact.join('_') + name_parts.compact.join(queue_name_delimiter) end end included do class_attribute :queue_name, instance_accessor: false + class_attribute :queue_name_delimiter, instance_accessor: false + self.queue_name = default_queue_name + self.queue_name_delimiter = '_' # set default delimiter to '_' end # Returns the name of the queue the job will be run on diff --git a/activejob/lib/active_job/test_helper.rb b/activejob/lib/active_job/test_helper.rb index af62fae9b9..25bc99a4f8 100644 --- a/activejob/lib/active_job/test_helper.rb +++ b/activejob/lib/active_job/test_helper.rb @@ -1,3 +1,5 @@ +require 'active_support/core_ext/hash/keys' + module ActiveJob # Provides helper methods for testing Active Job module TestHelper @@ -40,20 +42,28 @@ module ActiveJob # HelloJob.perform_later('rafael') # end # end - def assert_enqueued_jobs(number) + # + # The number of times a specific job is enqueued can be asserted. + # + # def test_logging_job + # assert_enqueued_jobs 2, only: LoggingJob do + # LoggingJob.perform_later + # HelloJob.perform_later('jeremy') + # end + # end + def assert_enqueued_jobs(number, only: nil) if block_given? - original_count = enqueued_jobs.size + original_count = enqueued_jobs_size(only: only) yield - new_count = enqueued_jobs.size - assert_equal original_count + number, new_count, - "#{number} jobs expected, but #{new_count - original_count} were enqueued" + new_count = enqueued_jobs_size(only: only) + assert_equal original_count + number, new_count, "#{number} jobs expected, but #{new_count - original_count} were enqueued" else - enqueued_jobs_size = enqueued_jobs.size - assert_equal number, enqueued_jobs_size, "#{number} jobs expected, but #{enqueued_jobs_size} were enqueued" + actual_count = enqueued_jobs_size(only: only) + assert_equal number, actual_count, "#{number} jobs expected, but #{actual_count} were enqueued" end end - # Assert that no job have been enqueued. + # Asserts that no jobs have been enqueued. # # def test_jobs # assert_no_enqueued_jobs @@ -69,21 +79,37 @@ module ActiveJob # end # end # + # It can be asserted that no jobs of a specific kind are enqueued: + # + # def test_no_logging + # assert_no_enqueued_jobs only: LoggingJob do + # HelloJob.perform_later('jeremy') + # end + # end + # # Note: This assertion is simply a shortcut for: # - # assert_enqueued_jobs 0 - def assert_no_enqueued_jobs(&block) - assert_enqueued_jobs 0, &block + # assert_enqueued_jobs 0, &block + def assert_no_enqueued_jobs(only: nil, &block) + assert_enqueued_jobs 0, only: only, &block end # Asserts that the number of performed jobs matches the given number. + # If no block is passed, <tt>perform_enqueued_jobs</tt> + # must be called around the job call. # # def test_jobs # assert_performed_jobs 0 - # HelloJob.perform_later('xavier') + # + # perform_enqueued_jobs do + # HelloJob.perform_later('xavier') + # end # assert_performed_jobs 1 - # HelloJob.perform_later('yves') - # assert_performed_jobs 2 + # + # perform_enqueued_jobs do + # HelloJob.perform_later('yves') + # assert_performed_jobs 2 + # end # end # # If a block is passed, that block should cause the specified number of @@ -99,10 +125,32 @@ module ActiveJob # HelloJob.perform_later('sean') # end # end - def assert_performed_jobs(number) + # + # The block form supports filtering. If the :only option is specified, + # then only the listed job(s) will be performed. + # + # def test_hello_job + # assert_performed_jobs 1, only: HelloJob do + # HelloJob.perform_later('jeremy') + # LoggingJob.perform_later + # end + # end + # + # An array may also be specified, to support testing multiple jobs. + # + # def test_hello_and_logging_jobs + # assert_nothing_raised do + # assert_performed_jobs 2, only: [HelloJob, LoggingJob] do + # HelloJob.perform_later('jeremy') + # LoggingJob.perform_later('stewie') + # RescueJob.perform_later('david') + # end + # end + # end + def assert_performed_jobs(number, only: nil) if block_given? original_count = performed_jobs.size - yield + perform_enqueued_jobs(only: only) { yield } new_count = performed_jobs.size assert_equal original_count + number, new_count, "#{number} jobs expected, but #{new_count - original_count} were performed" @@ -116,8 +164,11 @@ module ActiveJob # # def test_jobs # assert_no_performed_jobs - # HelloJob.perform_later('matthew') - # assert_performed_jobs 1 + # + # perform_enqueued_jobs do + # HelloJob.perform_later('matthew') + # assert_performed_jobs 1 + # end # end # # If a block is passed, that block should not cause any job to be performed. @@ -128,16 +179,38 @@ module ActiveJob # end # end # + # The block form supports filtering. If the :only option is specified, + # then only the listed job(s) will be performed. + # + # def test_hello_job + # assert_performed_jobs 1, only: HelloJob do + # HelloJob.perform_later('jeremy') + # LoggingJob.perform_later + # end + # end + # + # An array may also be specified, to support testing multiple jobs. + # + # def test_hello_and_logging_jobs + # assert_nothing_raised do + # assert_performed_jobs 2, only: [HelloJob, LoggingJob] do + # HelloJob.perform_later('jeremy') + # LoggingJob.perform_later('stewie') + # RescueJob.perform_later('david') + # end + # end + # end + # # Note: This assertion is simply a shortcut for: # - # assert_performed_jobs 0 - def assert_no_performed_jobs(&block) - assert_performed_jobs 0, &block + # assert_performed_jobs 0, &block + def assert_no_performed_jobs(only: nil, &block) + assert_performed_jobs 0, only: only, &block end # Asserts that the job passed in the block has been enqueued with the given arguments. # - # def assert_enqueued_job + # def test_assert_enqueued_with # assert_enqueued_with(job: MyJob, args: [1,2,3], queue: 'low') do # MyJob.perform_later(1,2,3) # end @@ -146,9 +219,10 @@ module ActiveJob original_enqueued_jobs = enqueued_jobs.dup clear_enqueued_jobs args.assert_valid_keys(:job, :args, :at, :queue) + serialized_args = serialize_args_for_assertion(args) yield matching_job = enqueued_jobs.any? do |job| - args.all? { |key, value| value == job[key] } + serialized_args.all? { |key, value| value == job[key] } end assert matching_job, "No enqueued job found with #{args}" ensure @@ -166,15 +240,28 @@ module ActiveJob original_performed_jobs = performed_jobs.dup clear_performed_jobs args.assert_valid_keys(:job, :args, :at, :queue) - yield + serialized_args = serialize_args_for_assertion(args) + perform_enqueued_jobs { yield } matching_job = performed_jobs.any? do |job| - args.all? { |key, value| value == job[key] } + serialized_args.all? { |key, value| value == job[key] } end assert matching_job, "No performed job found with #{args}" ensure queue_adapter.performed_jobs = original_performed_jobs + performed_jobs end + def perform_enqueued_jobs(only: nil) + @old_perform_enqueued_jobs = queue_adapter.perform_enqueued_jobs + @old_perform_enqueued_at_jobs = queue_adapter.perform_enqueued_at_jobs + queue_adapter.perform_enqueued_jobs = true + queue_adapter.perform_enqueued_at_jobs = true + queue_adapter.filter = only + yield + ensure + queue_adapter.perform_enqueued_jobs = @old_perform_enqueued_jobs + queue_adapter.perform_enqueued_at_jobs = @old_perform_enqueued_at_jobs + end + def queue_adapter ActiveJob::Base.queue_adapter end @@ -191,6 +278,22 @@ module ActiveJob def clear_performed_jobs performed_jobs.clear end + + def enqueued_jobs_size(only: nil) + if only + enqueued_jobs.select { |job| job[:job] == only }.size + else + enqueued_jobs.size + end + end + + def serialize_args_for_assertion(args) + serialized_args = args.dup + if job_args = serialized_args.delete(:args) + serialized_args[:args] = ActiveJob::Arguments.serialize(job_args) + end + serialized_args + end end end end |