aboutsummaryrefslogtreecommitdiffstats
path: root/activejob/lib
diff options
context:
space:
mode:
Diffstat (limited to 'activejob/lib')
-rw-r--r--activejob/lib/active_job.rb14
-rw-r--r--activejob/lib/active_job/arguments.rb104
-rw-r--r--activejob/lib/active_job/async_job.rb77
-rw-r--r--activejob/lib/active_job/base.rb26
-rw-r--r--activejob/lib/active_job/callbacks.rb17
-rw-r--r--activejob/lib/active_job/configured_job.rb4
-rw-r--r--activejob/lib/active_job/core.rb107
-rw-r--r--activejob/lib/active_job/enqueuing.rb51
-rw-r--r--activejob/lib/active_job/exceptions.rb143
-rw-r--r--activejob/lib/active_job/execution.rb19
-rw-r--r--activejob/lib/active_job/gem_version.rb4
-rw-r--r--activejob/lib/active_job/logging.rb160
-rw-r--r--activejob/lib/active_job/queue_adapter.rb61
-rw-r--r--activejob/lib/active_job/queue_adapters.rb21
-rw-r--r--activejob/lib/active_job/queue_adapters/async_adapter.rb107
-rw-r--r--activejob/lib/active_job/queue_adapters/backburner_adapter.rb8
-rw-r--r--activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb8
-rw-r--r--activejob/lib/active_job/queue_adapters/inline_adapter.rb6
-rw-r--r--activejob/lib/active_job/queue_adapters/qu_adapter.rb44
-rw-r--r--activejob/lib/active_job/queue_adapters/que_adapter.rb4
-rw-r--r--activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb10
-rw-r--r--activejob/lib/active_job/queue_adapters/resque_adapter.rb13
-rw-r--r--activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb26
-rw-r--r--activejob/lib/active_job/queue_adapters/sneakers_adapter.rb8
-rw-r--r--activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb29
-rw-r--r--activejob/lib/active_job/queue_adapters/test_adapter.rb47
-rw-r--r--activejob/lib/active_job/queue_name.rb16
-rw-r--r--activejob/lib/active_job/queue_priority.rb11
-rw-r--r--activejob/lib/active_job/railtie.rb36
-rw-r--r--activejob/lib/active_job/serializers.rb63
-rw-r--r--activejob/lib/active_job/serializers/date_serializer.rb21
-rw-r--r--activejob/lib/active_job/serializers/date_time_serializer.rb21
-rw-r--r--activejob/lib/active_job/serializers/duration_serializer.rb24
-rw-r--r--activejob/lib/active_job/serializers/object_serializer.rb54
-rw-r--r--activejob/lib/active_job/serializers/symbol_serializer.rb21
-rw-r--r--activejob/lib/active_job/serializers/time_serializer.rb21
-rw-r--r--activejob/lib/active_job/serializers/time_with_zone_serializer.rb21
-rw-r--r--activejob/lib/active_job/test_case.rb6
-rw-r--r--activejob/lib/active_job/test_helper.rb873
-rw-r--r--activejob/lib/active_job/timezones.rb13
-rw-r--r--activejob/lib/active_job/translation.rb4
-rw-r--r--activejob/lib/active_job/version.rb4
-rw-r--r--activejob/lib/rails/generators/job/job_generator.rb33
-rw-r--r--activejob/lib/rails/generators/job/templates/application_job.rb.tt9
-rw-r--r--activejob/lib/rails/generators/job/templates/job.rb.tt (renamed from activejob/lib/rails/generators/job/templates/job.rb)0
45 files changed, 1623 insertions, 746 deletions
diff --git a/activejob/lib/active_job.rb b/activejob/lib/active_job.rb
index eb8091a805..01fab4d918 100644
--- a/activejob/lib/active_job.rb
+++ b/activejob/lib/active_job.rb
@@ -1,5 +1,7 @@
+# frozen_string_literal: true
+
#--
-# Copyright (c) 2014-2015 David Heinemeier Hansson
+# Copyright (c) 2014-2018 David Heinemeier Hansson
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
@@ -21,18 +23,18 @@
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#++
-require 'active_support'
-require 'active_support/rails'
-require 'active_job/version'
-require 'global_id'
+require "active_support"
+require "active_support/rails"
+require "active_job/version"
+require "global_id"
module ActiveJob
extend ActiveSupport::Autoload
autoload :Base
autoload :QueueAdapters
+ autoload :Serializers
autoload :ConfiguredJob
- autoload :AsyncJob
autoload :TestCase
autoload :TestHelper
end
diff --git a/activejob/lib/active_job/arguments.rb b/activejob/lib/active_job/arguments.rb
index e56bc79328..fa58c50ed0 100644
--- a/activejob/lib/active_job/arguments.rb
+++ b/activejob/lib/active_job/arguments.rb
@@ -1,51 +1,43 @@
-require 'active_support/core_ext/hash'
+# frozen_string_literal: true
+
+require "active_support/core_ext/hash"
module ActiveJob
# Raised when an exception is raised during job arguments deserialization.
#
# Wraps the original exception raised as +cause+.
class DeserializationError < StandardError
- def initialize(e = nil) #:nodoc:
- if e
- ActiveSupport::Deprecation.warn("Passing #original_exception is deprecated and has no effect. " \
- "Exceptions will automatically capture the original exception.", caller)
- end
-
+ def initialize #:nodoc:
super("Error while trying to deserialize arguments: #{$!.message}")
set_backtrace $!.backtrace
end
-
- # The original exception that was raised during deserialization of job
- # arguments.
- def original_exception
- ActiveSupport::Deprecation.warn("#original_exception is deprecated. Use #cause instead.", caller)
- cause
- end
end
# Raised when an unsupported argument type is set as a job argument. We
- # currently support NilClass, Fixnum, Float, String, TrueClass, FalseClass,
- # Bignum and objects that can be represented as GlobalIDs (ex: Active Record).
+ # currently support String, Integer, Float, NilClass, TrueClass, FalseClass,
+ # BigDecimal, Symbol, Date, Time, DateTime, ActiveSupport::TimeWithZone,
+ # ActiveSupport::Duration, Hash, ActiveSupport::HashWithIndifferentAccess,
+ # Array or GlobalID::Identification instances, although this can be extended
+ # by adding custom serializers.
# Raised if you set the key for a Hash something else than a string or
# a symbol. Also raised when trying to serialize an object which can't be
- # identified with a Global ID - such as an unpersisted Active Record model.
+ # identified with a GlobalID - such as an unpersisted Active Record model.
class SerializationError < ArgumentError; end
module Arguments
extend self
- # :nodoc:
- TYPE_WHITELIST = [ NilClass, Fixnum, Float, String, TrueClass, FalseClass, Bignum ]
-
- # Serializes a set of arguments. Whitelisted types are returned
- # as-is. Arrays/Hashes are serialized element by element.
- # All other types are serialized using GlobalID.
+ # Serializes a set of arguments. Intrinsic types that can safely be
+ # serialized without mutation are returned as-is. Arrays/Hashes are
+ # serialized element by element. All other types are serialized using
+ # GlobalID.
def serialize(arguments)
arguments.map { |argument| serialize_argument(argument) }
end
- # Deserializes a set of arguments. Whitelisted types are returned
- # as-is. Arrays/Hashes are deserialized element by element.
- # All other types are deserialized using GlobalID.
+ # Deserializes a set of arguments. Intrinsic types that can safely be
+ # deserialized without mutation are returned as-is. Arrays/Hashes are
+ # deserialized element by element. All other types are deserialized using
+ # GlobalID.
def deserialize(arguments)
arguments.map { |argument| deserialize_argument(argument) }
rescue
@@ -53,33 +45,46 @@ module ActiveJob
end
private
+
+ # :nodoc:
+ PERMITTED_TYPES = [ NilClass, String, Integer, Float, BigDecimal, TrueClass, FalseClass ]
# :nodoc:
- GLOBALID_KEY = '_aj_globalid'.freeze
+ GLOBALID_KEY = "_aj_globalid"
# :nodoc:
- SYMBOL_KEYS_KEY = '_aj_symbol_keys'.freeze
+ SYMBOL_KEYS_KEY = "_aj_symbol_keys"
# :nodoc:
- WITH_INDIFFERENT_ACCESS_KEY = '_aj_hash_with_indifferent_access'.freeze
- private_constant :GLOBALID_KEY, :SYMBOL_KEYS_KEY, :WITH_INDIFFERENT_ACCESS_KEY
+ WITH_INDIFFERENT_ACCESS_KEY = "_aj_hash_with_indifferent_access"
+ # :nodoc:
+ OBJECT_SERIALIZER_KEY = "_aj_serialized"
+
+ # :nodoc:
+ RESERVED_KEYS = [
+ GLOBALID_KEY, GLOBALID_KEY.to_sym,
+ SYMBOL_KEYS_KEY, SYMBOL_KEYS_KEY.to_sym,
+ OBJECT_SERIALIZER_KEY, OBJECT_SERIALIZER_KEY.to_sym,
+ WITH_INDIFFERENT_ACCESS_KEY, WITH_INDIFFERENT_ACCESS_KEY.to_sym,
+ ]
+ private_constant :PERMITTED_TYPES, :RESERVED_KEYS, :GLOBALID_KEY, :SYMBOL_KEYS_KEY, :WITH_INDIFFERENT_ACCESS_KEY
def serialize_argument(argument)
case argument
- when *TYPE_WHITELIST
+ when *PERMITTED_TYPES
argument
when GlobalID::Identification
convert_to_global_id_hash(argument)
when Array
argument.map { |arg| serialize_argument(arg) }
when ActiveSupport::HashWithIndifferentAccess
- result = serialize_hash(argument)
- result[WITH_INDIFFERENT_ACCESS_KEY] = serialize_argument(true)
- result
+ serialize_indifferent_hash(argument)
when Hash
symbol_keys = argument.each_key.grep(Symbol).map(&:to_s)
result = serialize_hash(argument)
result[SYMBOL_KEYS_KEY] = symbol_keys
result
+ when -> (arg) { arg.respond_to?(:permitted?) }
+ serialize_indifferent_hash(argument.to_h)
else
- raise SerializationError.new("Unsupported argument type: #{argument.class.name}")
+ Serializers.serialize(argument)
end
end
@@ -87,13 +92,15 @@ module ActiveJob
case argument
when String
GlobalID::Locator.locate(argument) || argument
- when *TYPE_WHITELIST
+ when *PERMITTED_TYPES
argument
when Array
argument.map { |arg| deserialize_argument(arg) }
when Hash
if serialized_global_id?(argument)
deserialize_global_id argument
+ elsif custom_serialized?(argument)
+ Serializers.deserialize(argument)
else
deserialize_hash(argument)
end
@@ -103,13 +110,17 @@ module ActiveJob
end
def serialized_global_id?(hash)
- hash.size == 1 and hash.include?(GLOBALID_KEY)
+ hash.size == 1 && hash.include?(GLOBALID_KEY)
end
def deserialize_global_id(hash)
GlobalID::Locator.locate hash[GLOBALID_KEY]
end
+ def custom_serialized?(hash)
+ hash.key?(OBJECT_SERIALIZER_KEY)
+ end
+
def serialize_hash(argument)
argument.each_with_object({}) do |(key, value), hash|
hash[serialize_hash_key(key)] = serialize_argument(value)
@@ -126,14 +137,6 @@ module ActiveJob
result
end
- # :nodoc:
- RESERVED_KEYS = [
- GLOBALID_KEY, GLOBALID_KEY.to_sym,
- SYMBOL_KEYS_KEY, SYMBOL_KEYS_KEY.to_sym,
- WITH_INDIFFERENT_ACCESS_KEY, WITH_INDIFFERENT_ACCESS_KEY.to_sym,
- ]
- private_constant :RESERVED_KEYS
-
def serialize_hash_key(key)
case key
when *RESERVED_KEYS
@@ -145,8 +148,17 @@ module ActiveJob
end
end
+ def serialize_indifferent_hash(indifferent_hash)
+ result = serialize_hash(indifferent_hash)
+ result[WITH_INDIFFERENT_ACCESS_KEY] = serialize_argument(true)
+ result
+ end
+
def transform_symbol_keys(hash, symbol_keys)
- hash.transform_keys do |key|
+ # NOTE: HashWithIndifferentAccess#transform_keys always
+ # returns stringified keys with indifferent access
+ # so we call #to_h here to ensure keys are symbolized.
+ hash.to_h.transform_keys do |key|
if symbol_keys.include?(key)
key.to_sym
else
diff --git a/activejob/lib/active_job/async_job.rb b/activejob/lib/active_job/async_job.rb
deleted file mode 100644
index ed7a6e8d9b..0000000000
--- a/activejob/lib/active_job/async_job.rb
+++ /dev/null
@@ -1,77 +0,0 @@
-require 'concurrent/map'
-require 'concurrent/scheduled_task'
-require 'concurrent/executor/thread_pool_executor'
-require 'concurrent/utility/processor_counter'
-
-module ActiveJob
- # == Active Job Async Job
- #
- # When enqueueing jobs with Async Job each job will be executed asynchronously
- # on a +concurrent-ruby+ thread pool. All job data is retained in memory.
- # Because job data is not saved to a persistent datastore there is no
- # additional infrastructure needed and jobs process quickly. The lack of
- # persistence, however, means that all unprocessed jobs will be lost on
- # application restart. Therefore in-memory queue adapters are unsuitable for
- # most production environments but are excellent for development and testing.
- #
- # Read more about Concurrent Ruby {here}[https://github.com/ruby-concurrency/concurrent-ruby].
- #
- # To use Async Job set the queue_adapter config to +:async+.
- #
- # Rails.application.config.active_job.queue_adapter = :async
- #
- # Async Job supports job queues specified with +queue_as+. Queues are created
- # automatically as needed and each has its own thread pool.
- class AsyncJob
-
- DEFAULT_EXECUTOR_OPTIONS = {
- min_threads: [2, Concurrent.processor_count].max,
- max_threads: Concurrent.processor_count * 10,
- auto_terminate: true,
- idletime: 60, # 1 minute
- max_queue: 0, # unlimited
- fallback_policy: :caller_runs # shouldn't matter -- 0 max queue
- }.freeze
-
- QUEUES = Concurrent::Map.new do |hash, queue_name| #:nodoc:
- hash.compute_if_absent(queue_name) { ActiveJob::AsyncJob.create_thread_pool }
- end
-
- class << self
- # Forces jobs to process immediately when testing the Active Job gem.
- # This should only be called from within unit tests.
- def perform_immediately! #:nodoc:
- @perform_immediately = true
- end
-
- # Allows jobs to run asynchronously when testing the Active Job gem.
- # This should only be called from within unit tests.
- def perform_asynchronously! #:nodoc:
- @perform_immediately = false
- end
-
- def create_thread_pool #:nodoc:
- if @perform_immediately
- Concurrent::ImmediateExecutor.new
- else
- Concurrent::ThreadPoolExecutor.new(DEFAULT_EXECUTOR_OPTIONS)
- end
- end
-
- def enqueue(job_data, queue: 'default') #:nodoc:
- QUEUES[queue].post(job_data) { |job| ActiveJob::Base.execute(job) }
- end
-
- def enqueue_at(job_data, timestamp, queue: 'default') #:nodoc:
- delay = timestamp - Time.current.to_f
- if delay > 0
- Concurrent::ScheduledTask.execute(delay, args: [job_data], executor: QUEUES[queue]) do |job|
- ActiveJob::Base.execute(job)
- end
- else
- enqueue(job_data, queue: queue)
- end
- end
- end
- end
-end
diff --git a/activejob/lib/active_job/base.rb b/activejob/lib/active_job/base.rb
index ff5c69ddc6..ed41fac4b8 100644
--- a/activejob/lib/active_job/base.rb
+++ b/activejob/lib/active_job/base.rb
@@ -1,12 +1,16 @@
-require 'active_job/core'
-require 'active_job/queue_adapter'
-require 'active_job/queue_name'
-require 'active_job/queue_priority'
-require 'active_job/enqueuing'
-require 'active_job/execution'
-require 'active_job/callbacks'
-require 'active_job/logging'
-require 'active_job/translation'
+# frozen_string_literal: true
+
+require "active_job/core"
+require "active_job/queue_adapter"
+require "active_job/queue_name"
+require "active_job/queue_priority"
+require "active_job/enqueuing"
+require "active_job/execution"
+require "active_job/callbacks"
+require "active_job/exceptions"
+require "active_job/logging"
+require "active_job/timezones"
+require "active_job/translation"
module ActiveJob #:nodoc:
# = Active Job
@@ -36,7 +40,7 @@ module ActiveJob #:nodoc:
# Records that are passed in are serialized/deserialized using Global
# ID. More information can be found in Arguments.
#
- # To enqueue a job to be performed as soon as the queueing system is free:
+ # To enqueue a job to be performed as soon as the queuing system is free:
#
# ProcessPhotoJob.perform_later(photo)
#
@@ -62,7 +66,9 @@ module ActiveJob #:nodoc:
include Enqueuing
include Execution
include Callbacks
+ include Exceptions
include Logging
+ include Timezones
include Translation
ActiveSupport.run_load_hooks(:active_job, self)
diff --git a/activejob/lib/active_job/callbacks.rb b/activejob/lib/active_job/callbacks.rb
index 2b6149e84e..61317c7cfc 100644
--- a/activejob/lib/active_job/callbacks.rb
+++ b/activejob/lib/active_job/callbacks.rb
@@ -1,10 +1,12 @@
-require 'active_support/callbacks'
+# frozen_string_literal: true
+
+require "active_support/callbacks"
module ActiveJob
# = Active Job Callbacks
#
# Active Job provides hooks during the life cycle of a job. Callbacks allow you
- # to trigger logic during the life cycle of a job. Available callbacks are:
+ # to trigger logic during this cycle. Available callbacks are:
#
# * <tt>before_enqueue</tt>
# * <tt>around_enqueue</tt>
@@ -13,10 +15,17 @@ module ActiveJob
# * <tt>around_perform</tt>
# * <tt>after_perform</tt>
#
+ # NOTE: Calling the same callback multiple times will overwrite previous callback definitions.
+ #
module Callbacks
extend ActiveSupport::Concern
include ActiveSupport::Callbacks
+ class << self
+ include ActiveSupport::Callbacks
+ define_callbacks :execute
+ end
+
included do
define_callbacks :perform
define_callbacks :enqueue
@@ -121,8 +130,8 @@ module ActiveJob
set_callback(:enqueue, :after, *filters, &blk)
end
- # Defines a callback that will get called before and after the
- # job is enqueued.
+ # Defines a callback that will get called around the enqueuing
+ # of the job.
#
# class VideoProcessJob < ActiveJob::Base
# queue_as :default
diff --git a/activejob/lib/active_job/configured_job.rb b/activejob/lib/active_job/configured_job.rb
index 979280b910..67daf48b36 100644
--- a/activejob/lib/active_job/configured_job.rb
+++ b/activejob/lib/active_job/configured_job.rb
@@ -1,6 +1,8 @@
+# frozen_string_literal: true
+
module ActiveJob
class ConfiguredJob #:nodoc:
- def initialize(job_class, options={})
+ def initialize(job_class, options = {})
@options = options
@job_class = job_class
end
diff --git a/activejob/lib/active_job/core.rb b/activejob/lib/active_job/core.rb
index 19b900a285..698153636b 100644
--- a/activejob/lib/active_job/core.rb
+++ b/activejob/lib/active_job/core.rb
@@ -1,39 +1,45 @@
+# frozen_string_literal: true
+
module ActiveJob
# Provides general behavior that will be included into every Active Job
# object that inherits from ActiveJob::Base.
module Core
extend ActiveSupport::Concern
- included do
- # Job arguments
- attr_accessor :arguments
- attr_writer :serialized_arguments
+ # Job arguments
+ attr_accessor :arguments
+ attr_writer :serialized_arguments
- # Timestamp when the job should be performed
- attr_accessor :scheduled_at
+ # Timestamp when the job should be performed
+ attr_accessor :scheduled_at
- # Job Identifier
- attr_accessor :job_id
+ # Job Identifier
+ attr_accessor :job_id
- # Queue in which the job will reside.
- attr_writer :queue_name
+ # Queue in which the job will reside.
+ attr_writer :queue_name
- # Priority that the job will have (lower is more priority).
- attr_writer :priority
+ # Priority that the job will have (lower is more priority).
+ attr_writer :priority
- # ID optionally provided by adapter
- attr_accessor :provider_job_id
+ # ID optionally provided by adapter
+ attr_accessor :provider_job_id
- # I18n.locale to be used during the job.
- attr_accessor :locale
- end
+ # Number of times this job has been executed (which increments on every retry, like after an exception).
+ attr_accessor :executions
+
+ # I18n.locale to be used during the job.
+ attr_accessor :locale
+
+ # Timezone to be used during the job.
+ attr_accessor :timezone
# These methods will be included into any Active Job object, adding
# helpers for de/serialization and creation of job instances.
module ClassMethods
# Creates a new job instance from a hash created with +serialize+
def deserialize(job_data)
- job = job_data['job_class'].constantize.new
+ job = job_data["job_class"].constantize.new
job.deserialize(job_data)
job
end
@@ -56,7 +62,7 @@ module ActiveJob
# VideoJob.set(queue: :some_queue, wait: 5.minutes).perform_later(Video.last)
# VideoJob.set(queue: :some_queue, wait_until: Time.now.tomorrow).perform_later(Video.last)
# VideoJob.set(queue: :some_queue, wait: 5.minutes, priority: 10).perform_later(Video.last)
- def set(options={})
+ def set(options = {})
ConfiguredJob.new(self, options)
end
end
@@ -68,18 +74,22 @@ module ActiveJob
@job_id = SecureRandom.uuid
@queue_name = self.class.queue_name
@priority = self.class.priority
+ @executions = 0
end
# Returns a hash with the job data that can safely be passed to the
- # queueing adapter.
+ # queuing adapter.
def serialize
{
- 'job_class' => self.class.name,
- 'job_id' => job_id,
- 'queue_name' => queue_name,
- 'priority' => priority,
- 'arguments' => serialize_arguments(arguments),
- 'locale' => I18n.locale
+ "job_class" => self.class.name,
+ "job_id" => job_id,
+ "provider_job_id" => provider_job_id,
+ "queue_name" => queue_name,
+ "priority" => priority,
+ "arguments" => serialize_arguments_if_needed(arguments),
+ "executions" => executions,
+ "locale" => I18n.locale.to_s,
+ "timezone" => Time.zone.try(:name)
}
end
@@ -89,42 +99,63 @@ module ActiveJob
# ==== Examples
#
# class DeliverWebhookJob < ActiveJob::Base
+ # attr_writer :attempt_number
+ #
+ # def attempt_number
+ # @attempt_number ||= 0
+ # end
+ #
# def serialize
- # super.merge('attempt_number' => (@attempt_number || 0) + 1)
+ # super.merge('attempt_number' => attempt_number + 1)
# end
#
# def deserialize(job_data)
# super
- # @attempt_number = job_data['attempt_number']
+ # self.attempt_number = job_data['attempt_number']
# end
#
- # rescue_from(TimeoutError) do |exception|
- # raise exception if @attempt_number > 5
+ # rescue_from(Timeout::Error) do |exception|
+ # raise exception if attempt_number > 5
# retry_job(wait: 10)
# end
# end
def deserialize(job_data)
- self.job_id = job_data['job_id']
- self.queue_name = job_data['queue_name']
- self.priority = job_data['priority']
- self.serialized_arguments = job_data['arguments']
- self.locale = job_data['locale'] || I18n.locale
+ self.job_id = job_data["job_id"]
+ self.provider_job_id = job_data["provider_job_id"]
+ self.queue_name = job_data["queue_name"]
+ self.priority = job_data["priority"]
+ self.serialized_arguments = job_data["arguments"]
+ self.executions = job_data["executions"]
+ self.locale = job_data["locale"] || I18n.locale.to_s
+ self.timezone = job_data["timezone"] || Time.zone.try(:name)
end
private
+ def serialize_arguments_if_needed(arguments)
+ if arguments_serialized?
+ @serialized_arguments
+ else
+ serialize_arguments(arguments)
+ end
+ end
+
def deserialize_arguments_if_needed
- if defined?(@serialized_arguments) && @serialized_arguments.present?
+ if arguments_serialized?
@arguments = deserialize_arguments(@serialized_arguments)
@serialized_arguments = nil
end
end
- def serialize_arguments(serialized_args)
- Arguments.serialize(serialized_args)
+ def serialize_arguments(arguments)
+ Arguments.serialize(arguments)
end
def deserialize_arguments(serialized_args)
Arguments.deserialize(serialized_args)
end
+
+ def arguments_serialized?
+ defined?(@serialized_arguments) && @serialized_arguments
+ end
end
end
diff --git a/activejob/lib/active_job/enqueuing.rb b/activejob/lib/active_job/enqueuing.rb
index 22154457fd..b5b9f23c00 100644
--- a/activejob/lib/active_job/enqueuing.rb
+++ b/activejob/lib/active_job/enqueuing.rb
@@ -1,16 +1,20 @@
-require 'active_job/arguments'
+# frozen_string_literal: true
+
+require "active_job/arguments"
module ActiveJob
- # Provides behavior for enqueuing and retrying jobs.
+ # Provides behavior for enqueuing jobs.
module Enqueuing
extend ActiveSupport::Concern
# Includes the +perform_later+ method for job initialization.
module ClassMethods
- # Push a job onto the queue. The arguments must be legal JSON types
- # (string, int, float, nil, true, false, hash or array) or
- # GlobalID::Identification instances. Arbitrary Ruby objects
- # are not supported.
+ # Push a job onto the queue. By default the arguments must be either String,
+ # Integer, Float, NilClass, TrueClass, FalseClass, BigDecimal, Symbol, Date,
+ # Time, DateTime, ActiveSupport::TimeWithZone, ActiveSupport::Duration,
+ # Hash, ActiveSupport::HashWithIndifferentAccess, Array or
+ # GlobalID::Identification instances, although this can be extended by adding
+ # custom serializers.
#
# Returns an instance of the job class queued with arguments available in
# Job#arguments.
@@ -18,37 +22,12 @@ module ActiveJob
job_or_instantiate(*args).enqueue
end
- protected
- def job_or_instantiate(*args)
+ private
+ def job_or_instantiate(*args) # :doc:
args.first.is_a?(self) ? args.first : new(*args)
end
end
- # Reschedules the job to be re-executed. This is useful in combination
- # with the +rescue_from+ option. When you rescue an exception from your job
- # you can ask Active Job to retry performing your job.
- #
- # ==== Options
- # * <tt>:wait</tt> - Enqueues the job with the specified delay
- # * <tt>:wait_until</tt> - Enqueues the job at the time specified
- # * <tt>:queue</tt> - Enqueues the job on the specified queue
- # * <tt>:priority</tt> - Enqueues the job with the specified priority
- #
- # ==== Examples
- #
- # class SiteScrapperJob < ActiveJob::Base
- # rescue_from(ErrorLoadingSite) do
- # retry_job queue: :low_priority
- # end
- #
- # def perform(*args)
- # # raise ErrorLoadingSite if cannot scrape
- # end
- # end
- def retry_job(options={})
- enqueue options
- end
-
# Enqueues the job to be performed by the queue adapter.
#
# ==== Options
@@ -64,14 +43,14 @@ module ActiveJob
# my_job_instance.enqueue queue: :important
# my_job_instance.enqueue wait_until: Date.tomorrow.midnight
# my_job_instance.enqueue priority: 10
- def enqueue(options={})
+ def enqueue(options = {})
self.scheduled_at = options[:wait].seconds.from_now.to_f if options[:wait]
self.scheduled_at = options[:wait_until].to_f if options[:wait_until]
self.queue_name = self.class.queue_name_from_part(options[:queue]) if options[:queue]
self.priority = options[:priority].to_i if options[:priority]
run_callbacks :enqueue do
- if self.scheduled_at
- self.class.queue_adapter.enqueue_at self, self.scheduled_at
+ if scheduled_at
+ self.class.queue_adapter.enqueue_at self, scheduled_at
else
self.class.queue_adapter.enqueue self
end
diff --git a/activejob/lib/active_job/exceptions.rb b/activejob/lib/active_job/exceptions.rb
new file mode 100644
index 0000000000..bc9e168971
--- /dev/null
+++ b/activejob/lib/active_job/exceptions.rb
@@ -0,0 +1,143 @@
+# frozen_string_literal: true
+
+require "active_support/core_ext/numeric/time"
+
+module ActiveJob
+ # Provides behavior for retrying and discarding jobs on exceptions.
+ module Exceptions
+ extend ActiveSupport::Concern
+
+ module ClassMethods
+ # Catch the exception and reschedule job for re-execution after so many seconds, for a specific number of attempts.
+ # The number of attempts includes the total executions of a job, not just the retried executions.
+ # If the exception keeps getting raised beyond the specified number of attempts, the exception is allowed to
+ # bubble up to the underlying queuing system, which may have its own retry mechanism or place it in a
+ # holding queue for inspection.
+ #
+ # You can also pass a block that'll be invoked if the retry attempts fail for custom logic rather than letting
+ # the exception bubble up. This block is yielded with the job instance as the first and the error instance as the second parameter.
+ #
+ # ==== Options
+ # * <tt>:wait</tt> - Re-enqueues the job with a delay specified either in seconds (default: 3 seconds),
+ # as a computing proc that the number of executions so far as an argument, or as a symbol reference of
+ # <tt>:exponentially_longer</tt>, which applies the wait algorithm of <tt>(executions ** 4) + 2</tt>
+ # (first wait 3s, then 18s, then 83s, etc)
+ # * <tt>:attempts</tt> - Re-enqueues the job the specified number of times (default: 5 attempts),
+ # attempts here refers to the total number of times the job is executed, not just retried executions
+ # * <tt>:queue</tt> - Re-enqueues the job on a different queue
+ # * <tt>:priority</tt> - Re-enqueues the job with a different priority
+ #
+ # ==== Examples
+ #
+ # class RemoteServiceJob < ActiveJob::Base
+ # retry_on CustomAppException # defaults to 3s wait, 5 attempts
+ # retry_on AnotherCustomAppException, wait: ->(executions) { executions * 2 }
+ # retry_on(YetAnotherCustomAppException) do |job, error|
+ # ExceptionNotifier.caught(error)
+ # end
+ # retry_on ActiveRecord::Deadlocked, wait: 5.seconds, attempts: 3
+ # retry_on Net::OpenTimeout, wait: :exponentially_longer, attempts: 10
+ #
+ # def perform(*args)
+ # # Might raise CustomAppException, AnotherCustomAppException, or YetAnotherCustomAppException for something domain specific
+ # # Might raise ActiveRecord::Deadlocked when a local db deadlock is detected
+ # # Might raise Net::OpenTimeout when the remote service is down
+ # end
+ # end
+ def retry_on(*exceptions, wait: 3.seconds, attempts: 5, queue: nil, priority: nil)
+ rescue_from(*exceptions) do |error|
+ if executions < attempts
+ retry_job wait: determine_delay(wait), queue: queue, priority: priority, error: error
+ else
+ if block_given?
+ instrument :retry_stopped, error: error do
+ yield self, error
+ end
+ else
+ instrument :retry_stopped, error: error
+ raise error
+ end
+ end
+ end
+ end
+
+ # Discard the job with no attempts to retry, if the exception is raised. This is useful when the subject of the job,
+ # like an Active Record, is no longer available, and the job is thus no longer relevant.
+ #
+ # You can also pass a block that'll be invoked. This block is yielded with the job instance as the first and the error instance as the second parameter.
+ #
+ # ==== Example
+ #
+ # class SearchIndexingJob < ActiveJob::Base
+ # discard_on ActiveJob::DeserializationError
+ # discard_on(CustomAppException) do |job, error|
+ # ExceptionNotifier.caught(error)
+ # end
+ #
+ # def perform(record)
+ # # Will raise ActiveJob::DeserializationError if the record can't be deserialized
+ # # Might raise CustomAppException for something domain specific
+ # end
+ # end
+ def discard_on(*exceptions)
+ rescue_from(*exceptions) do |error|
+ instrument :discard, error: error do
+ yield self, error if block_given?
+ end
+ end
+ end
+ end
+
+ # Reschedules the job to be re-executed. This is useful in combination
+ # with the +rescue_from+ option. When you rescue an exception from your job
+ # you can ask Active Job to retry performing your job.
+ #
+ # ==== Options
+ # * <tt>:wait</tt> - Enqueues the job with the specified delay in seconds
+ # * <tt>:wait_until</tt> - Enqueues the job at the time specified
+ # * <tt>:queue</tt> - Enqueues the job on the specified queue
+ # * <tt>:priority</tt> - Enqueues the job with the specified priority
+ #
+ # ==== Examples
+ #
+ # class SiteScraperJob < ActiveJob::Base
+ # rescue_from(ErrorLoadingSite) do
+ # retry_job queue: :low_priority
+ # end
+ #
+ # def perform(*args)
+ # # raise ErrorLoadingSite if cannot scrape
+ # end
+ # end
+ def retry_job(options = {})
+ instrument :enqueue_retry, options.slice(:error, :wait) do
+ enqueue options
+ end
+ end
+
+ private
+ def determine_delay(seconds_or_duration_or_algorithm)
+ case seconds_or_duration_or_algorithm
+ when :exponentially_longer
+ (executions**4) + 2
+ when ActiveSupport::Duration
+ duration = seconds_or_duration_or_algorithm
+ duration.to_i
+ when Integer
+ seconds = seconds_or_duration_or_algorithm
+ seconds
+ when Proc
+ algorithm = seconds_or_duration_or_algorithm
+ algorithm.call(executions)
+ else
+ raise "Couldn't determine a delay based on #{seconds_or_duration_or_algorithm.inspect}"
+ end
+ end
+
+ def instrument(name, error: nil, wait: nil, &block)
+ payload = { job: self, adapter: self.class.queue_adapter, error: error, wait: wait }
+
+ ActiveSupport::Notifications.instrument("#{name}.active_job", payload, &block)
+ end
+ end
+end
diff --git a/activejob/lib/active_job/execution.rb b/activejob/lib/active_job/execution.rb
index 79d232da4a..e96dbcd4c9 100644
--- a/activejob/lib/active_job/execution.rb
+++ b/activejob/lib/active_job/execution.rb
@@ -1,5 +1,7 @@
-require 'active_support/rescuable'
-require 'active_job/arguments'
+# frozen_string_literal: true
+
+require "active_support/rescuable"
+require "active_job/arguments"
module ActiveJob
module Execution
@@ -17,22 +19,27 @@ module ActiveJob
end
def execute(job_data) #:nodoc:
- job = deserialize(job_data)
- job.perform_now
+ ActiveJob::Callbacks.run_callbacks(:execute) do
+ job = deserialize(job_data)
+ job.perform_now
+ end
end
end
- # Performs the job immediately. The job is not sent to the queueing adapter
+ # Performs the job immediately. The job is not sent to the queuing adapter
# but directly executed by blocking the execution of others until it's finished.
#
# MyJob.new(*args).perform_now
def perform_now
+ # Guard against jobs that were persisted before we started counting executions by zeroing out nil counters
+ self.executions = (executions || 0) + 1
+
deserialize_arguments_if_needed
run_callbacks :perform do
perform(*arguments)
end
rescue => exception
- rescue_with_handler(exception) || raise(exception)
+ rescue_with_handler(exception) || raise
end
def perform(*)
diff --git a/activejob/lib/active_job/gem_version.rb b/activejob/lib/active_job/gem_version.rb
index 27a5de93f4..770f70dc5e 100644
--- a/activejob/lib/active_job/gem_version.rb
+++ b/activejob/lib/active_job/gem_version.rb
@@ -1,3 +1,5 @@
+# frozen_string_literal: true
+
module ActiveJob
# Returns the version of the currently loaded Active Job as a <tt>Gem::Version</tt>
def self.gem_version
@@ -5,7 +7,7 @@ module ActiveJob
end
module VERSION
- MAJOR = 5
+ MAJOR = 6
MINOR = 0
TINY = 0
PRE = "alpha"
diff --git a/activejob/lib/active_job/logging.rb b/activejob/lib/active_job/logging.rb
index 605057d1e8..416be83c24 100644
--- a/activejob/lib/active_job/logging.rb
+++ b/activejob/lib/active_job/logging.rb
@@ -1,24 +1,25 @@
-require 'active_support/core_ext/hash/transform_values'
-require 'active_support/core_ext/string/filters'
-require 'active_support/tagged_logging'
-require 'active_support/logger'
+# frozen_string_literal: true
+
+require "active_support/core_ext/string/filters"
+require "active_support/tagged_logging"
+require "active_support/logger"
module ActiveJob
module Logging #:nodoc:
extend ActiveSupport::Concern
included do
- cattr_accessor(:logger) { ActiveSupport::TaggedLogging.new(ActiveSupport::Logger.new(STDOUT)) }
+ cattr_accessor :logger, default: ActiveSupport::TaggedLogging.new(ActiveSupport::Logger.new(STDOUT))
- around_enqueue do |_, block, _|
+ around_enqueue do |_, block|
tag_logger do
block.call
end
end
- around_perform do |job, block, _|
+ around_perform do |job, block|
tag_logger(job.class.name, job.job_id) do
- payload = {adapter: job.class.queue_adapter, job: job}
+ payload = { adapter: job.class.queue_adapter, job: job }
ActiveSupport::Notifications.instrument("perform_start.active_job", payload.dup)
ActiveSupport::Notifications.instrument("perform.active_job", payload) do
block.call
@@ -26,13 +27,13 @@ module ActiveJob
end
end
- after_enqueue do |job|
+ around_enqueue do |job, block|
if job.scheduled_at
- ActiveSupport::Notifications.instrument "enqueue_at.active_job",
- adapter: job.class.queue_adapter, job: job
+ ActiveSupport::Notifications.instrument("enqueue_at.active_job",
+ adapter: job.class.queue_adapter, job: job, &block)
else
- ActiveSupport::Notifications.instrument "enqueue.active_job",
- adapter: job.class.queue_adapter, job: job
+ ActiveSupport::Notifications.instrument("enqueue.active_job",
+ adapter: job.class.queue_adapter, job: job, &block)
end
end
end
@@ -41,7 +42,7 @@ module ActiveJob
def tag_logger(*tags)
if logger.respond_to?(:tagged)
tags.unshift "ActiveJob" unless logger_tagged_by_active_job?
- ActiveJob::Base.logger.tagged(*tags){ yield }
+ logger.tagged(*tags) { yield }
else
yield
end
@@ -51,70 +52,109 @@ module ActiveJob
logger.formatter.current_tags.include?("ActiveJob")
end
- class LogSubscriber < ActiveSupport::LogSubscriber #:nodoc:
- def enqueue(event)
- info do
- job = event.payload[:job]
- "Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)}" + args_info(job)
+ class LogSubscriber < ActiveSupport::LogSubscriber #:nodoc:
+ def enqueue(event)
+ info do
+ job = event.payload[:job]
+ "Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)}" + args_info(job)
+ end
end
- end
- def enqueue_at(event)
- info do
- job = event.payload[:job]
- "Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)} at #{scheduled_at(event)}" + args_info(job)
+ def enqueue_at(event)
+ info do
+ job = event.payload[:job]
+ "Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)} at #{scheduled_at(event)}" + args_info(job)
+ end
end
- end
- def perform_start(event)
- info do
- job = event.payload[:job]
- "Performing #{job.class.name} from #{queue_name(event)}" + args_info(job)
+ def perform_start(event)
+ info do
+ job = event.payload[:job]
+ "Performing #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)}" + args_info(job)
+ end
end
- end
- def perform(event)
- info do
+ def perform(event)
job = event.payload[:job]
- "Performed #{job.class.name} from #{queue_name(event)} in #{event.duration.round(2)}ms"
+ ex = event.payload[:exception_object]
+ if ex
+ error do
+ "Error performing #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} in #{event.duration.round(2)}ms: #{ex.class} (#{ex.message}):\n" + Array(ex.backtrace).join("\n")
+ end
+ else
+ info do
+ "Performed #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} in #{event.duration.round(2)}ms"
+ end
+ end
end
- end
- private
- def queue_name(event)
- event.payload[:adapter].class.name.demodulize.remove('Adapter') + "(#{event.payload[:job].queue_name})"
+ def enqueue_retry(event)
+ job = event.payload[:job]
+ ex = event.payload[:error]
+ wait = event.payload[:wait]
+
+ info do
+ if ex
+ "Retrying #{job.class} in #{wait.to_i} seconds, due to a #{ex.class}."
+ else
+ "Retrying #{job.class} in #{wait.to_i} seconds."
+ end
+ end
end
- def args_info(job)
- if job.arguments.any?
- ' with arguments: ' +
- job.arguments.map { |arg| format(arg).inspect }.join(', ')
- else
- ''
+ def retry_stopped(event)
+ job = event.payload[:job]
+ ex = event.payload[:error]
+
+ error do
+ "Stopped retrying #{job.class} due to a #{ex.class}, which reoccurred on #{job.executions} attempts."
end
end
- def format(arg)
- case arg
- when Hash
- arg.transform_values { |value| format(value) }
- when Array
- arg.map { |value| format(value) }
- when GlobalID::Identification
- arg.to_global_id rescue arg
- else
- arg
+ def discard(event)
+ job = event.payload[:job]
+ ex = event.payload[:error]
+
+ error do
+ "Discarded #{job.class} due to a #{ex.class}."
end
end
- def scheduled_at(event)
- Time.at(event.payload[:job].scheduled_at).utc
- end
+ private
+ def queue_name(event)
+ event.payload[:adapter].class.name.demodulize.remove("Adapter") + "(#{event.payload[:job].queue_name})"
+ end
- def logger
- ActiveJob::Base.logger
- end
- end
+ def args_info(job)
+ if job.arguments.any?
+ " with arguments: " +
+ job.arguments.map { |arg| format(arg).inspect }.join(", ")
+ else
+ ""
+ end
+ end
+
+ def format(arg)
+ case arg
+ when Hash
+ arg.transform_values { |value| format(value) }
+ when Array
+ arg.map { |value| format(value) }
+ when GlobalID::Identification
+ arg.to_global_id rescue arg
+ else
+ arg
+ end
+ end
+
+ def scheduled_at(event)
+ Time.at(event.payload[:job].scheduled_at).utc
+ end
+
+ def logger
+ ActiveJob::Base.logger
+ end
+ end
end
end
diff --git a/activejob/lib/active_job/queue_adapter.rb b/activejob/lib/active_job/queue_adapter.rb
index 457015b741..954bfd1dd1 100644
--- a/activejob/lib/active_job/queue_adapter.rb
+++ b/activejob/lib/active_job/queue_adapter.rb
@@ -1,63 +1,62 @@
-require 'active_job/queue_adapters/inline_adapter'
-require 'active_support/core_ext/class/attribute'
-require 'active_support/core_ext/string/inflections'
+# frozen_string_literal: true
+
+require "active_support/core_ext/string/inflections"
module ActiveJob
# The <tt>ActiveJob::QueueAdapter</tt> module is used to load the
- # correct adapter. The default queue adapter is the +:inline+ queue.
+ # correct adapter. The default queue adapter is the +:async+ queue.
module QueueAdapter #:nodoc:
extend ActiveSupport::Concern
included do
+ class_attribute :_queue_adapter_name, instance_accessor: false, instance_predicate: false
class_attribute :_queue_adapter, instance_accessor: false, instance_predicate: false
- self.queue_adapter = :inline
+ self.queue_adapter = :async
end
# Includes the setter method for changing the active queue adapter.
module ClassMethods
# Returns the backend queue provider. The default queue adapter
- # is the +:inline+ queue. See QueueAdapters for more information.
+ # is the +:async+ queue. See QueueAdapters for more information.
def queue_adapter
_queue_adapter
end
- # Specify the backend queue provider. The default queue adapter
- # is the +:inline+ queue. See QueueAdapters for more
- # information.
- def queue_adapter=(name_or_adapter_or_class)
- self._queue_adapter = interpret_adapter(name_or_adapter_or_class)
+ # Returns string denoting the name of the configured queue adapter.
+ # By default returns +"async"+.
+ def queue_adapter_name
+ _queue_adapter_name
end
- private
-
- def interpret_adapter(name_or_adapter_or_class)
- case name_or_adapter_or_class
+ # Specify the backend queue provider. The default queue adapter
+ # is the +:async+ queue. See QueueAdapters for more
+ # information.
+ def queue_adapter=(name_or_adapter)
+ case name_or_adapter
when Symbol, String
- ActiveJob::QueueAdapters.lookup(name_or_adapter_or_class).new
+ queue_adapter = ActiveJob::QueueAdapters.lookup(name_or_adapter).new
+ assign_adapter(name_or_adapter.to_s, queue_adapter)
else
- if queue_adapter?(name_or_adapter_or_class)
- name_or_adapter_or_class
- elsif queue_adapter_class?(name_or_adapter_or_class)
- ActiveSupport::Deprecation.warn "Passing an adapter class is deprecated " \
- "and will be removed in Rails 5.1. Please pass an adapter name " \
- "(.queue_adapter = :#{name_or_adapter_or_class.name.demodulize.remove('Adapter').underscore}) " \
- "or an instance (.queue_adapter = #{name_or_adapter_or_class.name}.new) instead."
- name_or_adapter_or_class.new
+ if queue_adapter?(name_or_adapter)
+ adapter_name = "#{name_or_adapter.class.name.demodulize.remove('Adapter').underscore}"
+ assign_adapter(adapter_name, name_or_adapter)
else
raise ArgumentError
end
end
end
- QUEUE_ADAPTER_METHODS = [:enqueue, :enqueue_at].freeze
+ private
+ def assign_adapter(adapter_name, queue_adapter)
+ self._queue_adapter_name = adapter_name
+ self._queue_adapter = queue_adapter
+ end
- def queue_adapter?(object)
- QUEUE_ADAPTER_METHODS.all? { |meth| object.respond_to?(meth) }
- end
+ QUEUE_ADAPTER_METHODS = [:enqueue, :enqueue_at].freeze
- def queue_adapter_class?(object)
- object.is_a?(Class) && QUEUE_ADAPTER_METHODS.all? { |meth| object.public_method_defined?(meth) }
- end
+ def queue_adapter?(object)
+ QUEUE_ADAPTER_METHODS.all? { |meth| object.respond_to?(meth) }
+ end
end
end
end
diff --git a/activejob/lib/active_job/queue_adapters.rb b/activejob/lib/active_job/queue_adapters.rb
index aeb1fe1e73..525e79e302 100644
--- a/activejob/lib/active_job/queue_adapters.rb
+++ b/activejob/lib/active_job/queue_adapters.rb
@@ -1,19 +1,21 @@
+# frozen_string_literal: true
+
module ActiveJob
# == Active Job adapters
#
- # Active Job has adapters for the following queueing backends:
+ # Active Job has adapters for the following queuing backends:
#
# * {Backburner}[https://github.com/nesquena/backburner]
# * {Delayed Job}[https://github.com/collectiveidea/delayed_job]
- # * {Qu}[https://github.com/bkeepers/qu]
# * {Que}[https://github.com/chanks/que]
# * {queue_classic}[https://github.com/QueueClassic/queue_classic]
- # * {Resque 1.x}[https://github.com/resque/resque/tree/1-x-stable]
+ # * {Resque}[https://github.com/resque/resque]
# * {Sidekiq}[http://sidekiq.org]
# * {Sneakers}[https://github.com/jondot/sneakers]
# * {Sucker Punch}[https://github.com/brandonhilkert/sucker_punch]
# * {Active Job Async Job}[http://api.rubyonrails.org/classes/ActiveJob/QueueAdapters/AsyncAdapter.html]
# * {Active Job Inline}[http://api.rubyonrails.org/classes/ActiveJob/QueueAdapters/InlineAdapter.html]
+ # * Please Note: We are not accepting pull requests for new adapters. See the {README}[link:files/activejob/README_md.html] for more details.
#
# === Backends Features
#
@@ -21,19 +23,19 @@ module ActiveJob
# |-------------------|-------|--------|------------|------------|---------|---------|
# | Backburner | Yes | Yes | Yes | Yes | Job | Global |
# | Delayed Job | Yes | Yes | Yes | Job | Global | Global |
- # | Qu | Yes | Yes | No | No | No | Global |
# | Que | Yes | Yes | Yes | Job | No | Job |
# | queue_classic | Yes | Yes | Yes* | No | No | No |
# | Resque | Yes | Yes | Yes (Gem) | Queue | Global | Yes |
# | Sidekiq | Yes | Yes | Yes | Queue | No | Job |
# | Sneakers | Yes | Yes | No | Queue | Queue | No |
- # | Sucker Punch | Yes | Yes | No | No | No | No |
+ # | Sucker Punch | Yes | Yes | Yes | No | No | No |
# | Active Job Async | Yes | Yes | Yes | No | No | No |
# | Active Job Inline | No | Yes | N/A | N/A | N/A | N/A |
#
# ==== Async
#
- # Yes: The Queue Adapter runs the jobs in a separate or forked process.
+ # Yes: The Queue Adapter has the ability to run the job in a non-blocking manner.
+ # It either runs on a separate or forked process, or on a different thread.
#
# No: The job is run in the same process.
#
@@ -50,7 +52,7 @@ module ActiveJob
#
# No: The adapter will run jobs at the next opportunity and cannot use perform_later.
#
- # N/A: The adapter does not support queueing.
+ # N/A: The adapter does not support queuing.
#
# NOTE:
# queue_classic supports job scheduling since version 3.1.
@@ -72,7 +74,7 @@ module ActiveJob
#
# No: Does not allow the priority of jobs to be configured.
#
- # N/A: The adapter does not support queueing, and therefore sorting them.
+ # N/A: The adapter does not support queuing, and therefore sorting them.
#
# ==== Timeout
#
@@ -111,7 +113,6 @@ module ActiveJob
autoload :InlineAdapter
autoload :BackburnerAdapter
autoload :DelayedJobAdapter
- autoload :QuAdapter
autoload :QueAdapter
autoload :QueueClassicAdapter
autoload :ResqueAdapter
@@ -120,7 +121,7 @@ module ActiveJob
autoload :SuckerPunchAdapter
autoload :TestAdapter
- ADAPTER = 'Adapter'.freeze
+ ADAPTER = "Adapter"
private_constant :ADAPTER
class << self
diff --git a/activejob/lib/active_job/queue_adapters/async_adapter.rb b/activejob/lib/active_job/queue_adapters/async_adapter.rb
index 3fc27f56e7..53a7e3d53e 100644
--- a/activejob/lib/active_job/queue_adapters/async_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/async_adapter.rb
@@ -1,22 +1,115 @@
-require 'active_job/async_job'
+# frozen_string_literal: true
+
+require "securerandom"
+require "concurrent/scheduled_task"
+require "concurrent/executor/thread_pool_executor"
+require "concurrent/utility/processor_counter"
module ActiveJob
module QueueAdapters
# == Active Job Async adapter
#
- # When enqueueing jobs with the Async adapter the job will be executed
- # asynchronously using {AsyncJob}[http://api.rubyonrails.org/classes/ActiveJob/AsyncJob.html].
+ # The Async adapter runs jobs with an in-process thread pool.
+ #
+ # This is the default queue adapter. It's well-suited for dev/test since
+ # it doesn't need an external infrastructure, but it's a poor fit for
+ # production since it drops pending jobs on restart.
+ #
+ # To use this adapter, set queue adapter to +:async+:
#
- # To use +AsyncJob+ set the queue_adapter config to +:async+.
+ # config.active_job.queue_adapter = :async
#
- # Rails.application.config.active_job.queue_adapter = :async
+ # To configure the adapter's thread pool, instantiate the adapter and
+ # pass your own config:
+ #
+ # config.active_job.queue_adapter = ActiveJob::QueueAdapters::AsyncAdapter.new \
+ # min_threads: 1,
+ # max_threads: 2 * Concurrent.processor_count,
+ # idletime: 600.seconds
+ #
+ # The adapter uses a {Concurrent Ruby}[https://github.com/ruby-concurrency/concurrent-ruby] thread pool to schedule and execute
+ # jobs. Since jobs share a single thread pool, long-running jobs will block
+ # short-lived jobs. Fine for dev/test; bad for production.
class AsyncAdapter
+ # See {Concurrent::ThreadPoolExecutor}[https://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/ThreadPoolExecutor.html] for executor options.
+ def initialize(**executor_options)
+ @scheduler = Scheduler.new(**executor_options)
+ end
+
def enqueue(job) #:nodoc:
- ActiveJob::AsyncJob.enqueue(job.serialize, queue: job.queue_name)
+ @scheduler.enqueue JobWrapper.new(job), queue_name: job.queue_name
end
def enqueue_at(job, timestamp) #:nodoc:
- ActiveJob::AsyncJob.enqueue_at(job.serialize, timestamp, queue: job.queue_name)
+ @scheduler.enqueue_at JobWrapper.new(job), timestamp, queue_name: job.queue_name
+ end
+
+ # Gracefully stop processing jobs. Finishes in-progress work and handles
+ # any new jobs following the executor's fallback policy (`caller_runs`).
+ # Waits for termination by default. Pass `wait: false` to continue.
+ def shutdown(wait: true) #:nodoc:
+ @scheduler.shutdown wait: wait
+ end
+
+ # Used for our test suite.
+ def immediate=(immediate) #:nodoc:
+ @scheduler.immediate = immediate
+ end
+
+ # Note that we don't actually need to serialize the jobs since we're
+ # performing them in-process, but we do so anyway for parity with other
+ # adapters and deployment environments. Otherwise, serialization bugs
+ # may creep in undetected.
+ class JobWrapper #:nodoc:
+ def initialize(job)
+ job.provider_job_id = SecureRandom.uuid
+ @job_data = job.serialize
+ end
+
+ def perform
+ Base.execute @job_data
+ end
+ end
+
+ class Scheduler #:nodoc:
+ DEFAULT_EXECUTOR_OPTIONS = {
+ min_threads: 0,
+ max_threads: Concurrent.processor_count,
+ auto_terminate: true,
+ idletime: 60, # 1 minute
+ max_queue: 0, # unlimited
+ fallback_policy: :caller_runs # shouldn't matter -- 0 max queue
+ }.freeze
+
+ attr_accessor :immediate
+
+ def initialize(**options)
+ self.immediate = false
+ @immediate_executor = Concurrent::ImmediateExecutor.new
+ @async_executor = Concurrent::ThreadPoolExecutor.new(DEFAULT_EXECUTOR_OPTIONS.merge(options))
+ end
+
+ def enqueue(job, queue_name:)
+ executor.post(job, &:perform)
+ end
+
+ def enqueue_at(job, timestamp, queue_name:)
+ delay = timestamp - Time.current.to_f
+ if delay > 0
+ Concurrent::ScheduledTask.execute(delay, args: [job], executor: executor, &:perform)
+ else
+ enqueue(job, queue_name: queue_name)
+ end
+ end
+
+ def shutdown(wait: true)
+ @async_executor.shutdown
+ @async_executor.wait_for_termination if wait
+ end
+
+ def executor
+ immediate ? @immediate_executor : @async_executor
+ end
end
end
end
diff --git a/activejob/lib/active_job/queue_adapters/backburner_adapter.rb b/activejob/lib/active_job/queue_adapters/backburner_adapter.rb
index 17703e3e41..7dc49310ac 100644
--- a/activejob/lib/active_job/queue_adapters/backburner_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/backburner_adapter.rb
@@ -1,4 +1,6 @@
-require 'backburner'
+# frozen_string_literal: true
+
+require "backburner"
module ActiveJob
module QueueAdapters
@@ -14,12 +16,12 @@ module ActiveJob
# Rails.application.config.active_job.queue_adapter = :backburner
class BackburnerAdapter
def enqueue(job) #:nodoc:
- Backburner::Worker.enqueue JobWrapper, [ job.serialize ], queue: job.queue_name
+ Backburner::Worker.enqueue(JobWrapper, [job.serialize], queue: job.queue_name, pri: job.priority)
end
def enqueue_at(job, timestamp) #:nodoc:
delay = timestamp - Time.current.to_f
- Backburner::Worker.enqueue JobWrapper, [ job.serialize ], queue: job.queue_name, delay: delay
+ Backburner::Worker.enqueue(JobWrapper, [job.serialize], queue: job.queue_name, pri: job.priority, delay: delay)
end
class JobWrapper #:nodoc:
diff --git a/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb b/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb
index 0a785fad3b..8eeef32b99 100644
--- a/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb
@@ -1,4 +1,6 @@
-require 'delayed_job'
+# frozen_string_literal: true
+
+require "delayed_job"
module ActiveJob
module QueueAdapters
@@ -32,6 +34,10 @@ module ActiveJob
@job_data = job_data
end
+ def display_name
+ "#{job_data['job_class']} [#{job_data['job_id']}] from DelayedJob(#{job_data['queue_name']}) with arguments: #{job_data['arguments']}"
+ end
+
def perform
Base.execute(job_data)
end
diff --git a/activejob/lib/active_job/queue_adapters/inline_adapter.rb b/activejob/lib/active_job/queue_adapters/inline_adapter.rb
index 8ad5f4de07..ca04dc943c 100644
--- a/activejob/lib/active_job/queue_adapters/inline_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/inline_adapter.rb
@@ -1,8 +1,10 @@
+# frozen_string_literal: true
+
module ActiveJob
module QueueAdapters
# == Active Job Inline adapter
#
- # When enqueueing jobs with the Inline adapter the job will be executed
+ # When enqueuing jobs with the Inline adapter the job will be executed
# immediately.
#
# To use the Inline set the queue_adapter config to +:inline+.
@@ -14,7 +16,7 @@ module ActiveJob
end
def enqueue_at(*) #:nodoc:
- raise NotImplementedError, "Use a queueing backend to enqueue jobs in the future. Read more at http://guides.rubyonrails.org/active_job_basics.html"
+ raise NotImplementedError, "Use a queueing backend to enqueue jobs in the future. Read more at https://guides.rubyonrails.org/active_job_basics.html"
end
end
end
diff --git a/activejob/lib/active_job/queue_adapters/qu_adapter.rb b/activejob/lib/active_job/queue_adapters/qu_adapter.rb
deleted file mode 100644
index 0e198922fc..0000000000
--- a/activejob/lib/active_job/queue_adapters/qu_adapter.rb
+++ /dev/null
@@ -1,44 +0,0 @@
-require 'qu'
-
-module ActiveJob
- module QueueAdapters
- # == Qu adapter for Active Job
- #
- # Qu is a Ruby library for queuing and processing background jobs. It is
- # heavily inspired by delayed_job and Resque. Qu was created to overcome
- # some shortcomings in the existing queuing libraries.
- # The advantages of Qu are: Multiple backends (redis, mongo), jobs are
- # requeued when worker is killed, resque-like API.
- #
- # Read more about Qu {here}[https://github.com/bkeepers/qu].
- #
- # To use Qu set the queue_adapter config to +:qu+.
- #
- # Rails.application.config.active_job.queue_adapter = :qu
- class QuAdapter
- def enqueue(job, *args) #:nodoc:
- qu_job = Qu::Payload.new(klass: JobWrapper, args: [job.serialize]).tap do |payload|
- payload.instance_variable_set(:@queue, job.queue_name)
- end.push
-
- # qu_job can be nil depending on the configured backend
- job.provider_job_id = qu_job.id unless qu_job.nil?
- qu_job
- end
-
- def enqueue_at(job, timestamp, *args) #:nodoc:
- raise NotImplementedError, "This queueing backend does not support scheduling jobs. To see what features are supported go to http://api.rubyonrails.org/classes/ActiveJob/QueueAdapters.html"
- end
-
- class JobWrapper < Qu::Job #:nodoc:
- def initialize(job_data)
- @job_data = job_data
- end
-
- def perform
- Base.execute @job_data
- end
- end
- end
- end
-end
diff --git a/activejob/lib/active_job/queue_adapters/que_adapter.rb b/activejob/lib/active_job/queue_adapters/que_adapter.rb
index ab13689747..86b5e07743 100644
--- a/activejob/lib/active_job/queue_adapters/que_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/que_adapter.rb
@@ -1,4 +1,6 @@
-require 'que'
+# frozen_string_literal: true
+
+require "que"
module ActiveJob
module QueueAdapters
diff --git a/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb b/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb
index 0ee41407d8..ccc1881091 100644
--- a/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb
@@ -1,4 +1,6 @@
-require 'queue_classic'
+# frozen_string_literal: true
+
+require "queue_classic"
module ActiveJob
module QueueAdapters
@@ -26,9 +28,9 @@ module ActiveJob
def enqueue_at(job, timestamp) #:nodoc:
queue = build_queue(job.queue_name)
unless queue.respond_to?(:enqueue_at)
- raise NotImplementedError, 'To be able to schedule jobs with queue_classic ' \
- 'the QC::Queue needs to respond to `enqueue_at(timestamp, method, *args)`. ' \
- 'You can implement this yourself or you can use the queue_classic-later gem.'
+ raise NotImplementedError, "To be able to schedule jobs with queue_classic " \
+ "the QC::Queue needs to respond to `enqueue_at(timestamp, method, *args)`. " \
+ "You can implement this yourself or you can use the queue_classic-later gem."
end
qc_job = queue.enqueue_at(timestamp, "#{JobWrapper.name}.perform", job.serialize)
job.provider_job_id = qc_job["id"] if qc_job.is_a?(Hash)
diff --git a/activejob/lib/active_job/queue_adapters/resque_adapter.rb b/activejob/lib/active_job/queue_adapters/resque_adapter.rb
index 417854afd8..590b4ee98d 100644
--- a/activejob/lib/active_job/queue_adapters/resque_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/resque_adapter.rb
@@ -1,12 +1,14 @@
-require 'resque'
-require 'active_support/core_ext/enumerable'
-require 'active_support/core_ext/array/access'
+# frozen_string_literal: true
+
+require "resque"
+require "active_support/core_ext/enumerable"
+require "active_support/core_ext/array/access"
begin
- require 'resque-scheduler'
+ require "resque-scheduler"
rescue LoadError
begin
- require 'resque_scheduler'
+ require "resque_scheduler"
rescue LoadError
false
end
@@ -27,6 +29,7 @@ module ActiveJob
# Rails.application.config.active_job.queue_adapter = :resque
class ResqueAdapter
def enqueue(job) #:nodoc:
+ JobWrapper.instance_variable_set(:@queue, job.queue_name)
Resque.enqueue_to job.queue_name, JobWrapper, job.serialize
end
diff --git a/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb b/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb
index c321776bf5..f726e6ad93 100644
--- a/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb
@@ -1,4 +1,6 @@
-require 'sidekiq'
+# frozen_string_literal: true
+
+require "sidekiq"
module ActiveJob
module QueueAdapters
@@ -16,28 +18,28 @@ module ActiveJob
# Rails.application.config.active_job.queue_adapter = :sidekiq
class SidekiqAdapter
def enqueue(job) #:nodoc:
- #Sidekiq::Client does not support symbols as keys
+ # Sidekiq::Client does not support symbols as keys
job.provider_job_id = Sidekiq::Client.push \
- 'class' => JobWrapper,
- 'wrapped' => job.class.to_s,
- 'queue' => job.queue_name,
- 'args' => [ job.serialize ]
+ "class" => JobWrapper,
+ "wrapped" => job.class.to_s,
+ "queue" => job.queue_name,
+ "args" => [ job.serialize ]
end
def enqueue_at(job, timestamp) #:nodoc:
job.provider_job_id = Sidekiq::Client.push \
- 'class' => JobWrapper,
- 'wrapped' => job.class.to_s,
- 'queue' => job.queue_name,
- 'args' => [ job.serialize ],
- 'at' => timestamp
+ "class" => JobWrapper,
+ "wrapped" => job.class.to_s,
+ "queue" => job.queue_name,
+ "args" => [ job.serialize ],
+ "at" => timestamp
end
class JobWrapper #:nodoc:
include Sidekiq::Worker
def perform(job_data)
- Base.execute job_data
+ Base.execute job_data.merge("provider_job_id" => jid)
end
end
end
diff --git a/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb b/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb
index d78bdecdcb..de98a950d0 100644
--- a/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb
@@ -1,5 +1,7 @@
-require 'sneakers'
-require 'monitor'
+# frozen_string_literal: true
+
+require "sneakers"
+require "monitor"
module ActiveJob
module QueueAdapters
@@ -33,7 +35,7 @@ module ActiveJob
class JobWrapper #:nodoc:
include Sneakers::Worker
- from_queue 'default'
+ from_queue "default"
def work(msg)
job_data = ActiveSupport::JSON.decode(msg)
diff --git a/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb b/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb
index c6c35f8ab4..d09e1e9143 100644
--- a/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb
@@ -1,16 +1,16 @@
-require 'sucker_punch'
+# frozen_string_literal: true
+
+require "sucker_punch"
module ActiveJob
module QueueAdapters
# == Sucker Punch adapter for Active Job
#
# Sucker Punch is a single-process Ruby asynchronous processing library.
- # It's girl_friday and DSL sugar on top of Celluloid. With Celluloid's
- # actor pattern, we can do asynchronous processing within a single process.
- # This reduces costs of hosting on a service like Heroku along with the
- # memory footprint of having to maintain additional jobs if hosting on
- # a dedicated server. All queues can run within a single Rails/Sinatra
- # process.
+ # This reduces the cost of hosting on a service like Heroku along
+ # with the memory footprint of having to maintain additional jobs if
+ # hosting on a dedicated server. All queues can run within a
+ # single application (eg. Rails, Sinatra, etc.) process.
#
# Read more about Sucker Punch {here}[https://github.com/brandonhilkert/sucker_punch].
#
@@ -19,11 +19,22 @@ module ActiveJob
# Rails.application.config.active_job.queue_adapter = :sucker_punch
class SuckerPunchAdapter
def enqueue(job) #:nodoc:
- JobWrapper.new.async.perform job.serialize
+ if JobWrapper.respond_to?(:perform_async)
+ # sucker_punch 2.0 API
+ JobWrapper.perform_async job.serialize
+ else
+ # sucker_punch 1.0 API
+ JobWrapper.new.async.perform job.serialize
+ end
end
def enqueue_at(job, timestamp) #:nodoc:
- raise NotImplementedError, "This queueing backend does not support scheduling jobs. To see what features are supported go to http://api.rubyonrails.org/classes/ActiveJob/QueueAdapters.html"
+ if JobWrapper.respond_to?(:perform_in)
+ delay = timestamp - Time.current.to_f
+ JobWrapper.perform_in delay, job.serialize
+ else
+ raise NotImplementedError, "sucker_punch 1.0 does not support `enqueued_at`. Please upgrade to version ~> 2.0.0 to enable this behavior."
+ end
end
class JobWrapper #:nodoc:
diff --git a/activejob/lib/active_job/queue_adapters/test_adapter.rb b/activejob/lib/active_job/queue_adapters/test_adapter.rb
index 9b7b7139f4..f73ad444ba 100644
--- a/activejob/lib/active_job/queue_adapters/test_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/test_adapter.rb
@@ -1,3 +1,5 @@
+# frozen_string_literal: true
+
module ActiveJob
module QueueAdapters
# == Test adapter for Active Job
@@ -10,7 +12,7 @@ module ActiveJob
#
# Rails.application.config.active_job.queue_adapter = :test
class TestAdapter
- attr_accessor(:perform_enqueued_jobs, :perform_enqueued_at_jobs, :filter)
+ attr_accessor(:perform_enqueued_jobs, :perform_enqueued_at_jobs, :filter, :reject, :queue)
attr_writer(:enqueued_jobs, :performed_jobs)
# Provides a store of all the enqueued jobs with the TestAdapter so you can check them.
@@ -27,34 +29,47 @@ module ActiveJob
return if filtered?(job)
job_data = job_to_hash(job)
- enqueue_or_perform(perform_enqueued_jobs, job, job_data)
+ perform_or_enqueue(perform_enqueued_jobs, job, job_data)
end
def enqueue_at(job, timestamp) #:nodoc:
return if filtered?(job)
job_data = job_to_hash(job, at: timestamp)
- enqueue_or_perform(perform_enqueued_at_jobs, job, job_data)
+ perform_or_enqueue(perform_enqueued_at_jobs, job, job_data)
end
private
+ def job_to_hash(job, extras = {})
+ { job: job.class, args: job.serialize.fetch("arguments"), queue: job.queue_name }.merge!(extras)
+ end
- def job_to_hash(job, extras = {})
- { job: job.class, args: job.serialize.fetch('arguments'), queue: job.queue_name }.merge!(extras)
- end
+ def perform_or_enqueue(perform, job, job_data)
+ if perform
+ performed_jobs << job_data
+ Base.execute job.serialize
+ else
+ enqueued_jobs << job_data
+ end
+ end
- def enqueue_or_perform(perform, job, job_data)
- if perform
- performed_jobs << job_data
- Base.execute job.serialize
- else
- enqueued_jobs << job_data
+ def filtered?(job)
+ filtered_queue?(job) || filtered_job_class?(job)
end
- end
- def filtered?(job)
- filter && !Array(filter).include?(job.class)
- end
+ def filtered_queue?(job)
+ if queue
+ job.queue_name != queue.to_s
+ end
+ end
+
+ def filtered_job_class?(job)
+ if filter
+ !Array(filter).include?(job.class)
+ elsif reject
+ Array(reject).include?(job.class)
+ end
+ end
end
end
end
diff --git a/activejob/lib/active_job/queue_name.rb b/activejob/lib/active_job/queue_name.rb
index 65786a49ff..9dc6bc7f2e 100644
--- a/activejob/lib/active_job/queue_name.rb
+++ b/activejob/lib/active_job/queue_name.rb
@@ -1,11 +1,13 @@
+# frozen_string_literal: true
+
module ActiveJob
module QueueName
extend ActiveSupport::Concern
# Includes the ability to override the default queue name and prefix.
module ClassMethods
- mattr_accessor(:queue_name_prefix)
- mattr_accessor(:default_queue_name) { "default" }
+ mattr_accessor :queue_name_prefix
+ mattr_accessor :default_queue_name, default: "default"
# Specifies the name of the queue to process the job on.
#
@@ -16,7 +18,7 @@ module ActiveJob
# post.to_feed!
# end
# end
- def queue_as(part_name=nil, &block)
+ def queue_as(part_name = nil, &block)
if block_given?
self.queue_name = block
else
@@ -32,11 +34,8 @@ module ActiveJob
end
included do
- class_attribute :queue_name, instance_accessor: false
- class_attribute :queue_name_delimiter, instance_accessor: false
-
- self.queue_name = default_queue_name
- self.queue_name_delimiter = '_' # set default delimiter to '_'
+ class_attribute :queue_name, instance_accessor: false, default: default_queue_name
+ class_attribute :queue_name_delimiter, instance_accessor: false, default: "_"
end
# Returns the name of the queue the job will be run on.
@@ -46,6 +45,5 @@ module ActiveJob
end
@queue_name
end
-
end
end
diff --git a/activejob/lib/active_job/queue_priority.rb b/activejob/lib/active_job/queue_priority.rb
index 01d84910ff..063bccdb01 100644
--- a/activejob/lib/active_job/queue_priority.rb
+++ b/activejob/lib/active_job/queue_priority.rb
@@ -1,10 +1,12 @@
+# frozen_string_literal: true
+
module ActiveJob
module QueuePriority
extend ActiveSupport::Concern
# Includes the ability to override the default queue priority.
module ClassMethods
- mattr_accessor(:default_priority)
+ mattr_accessor :default_priority
# Specifies the priority of the queue to create the job with.
#
@@ -17,7 +19,7 @@ module ActiveJob
# end
#
# Specify either an argument or a block.
- def queue_with_priority(priority=nil, &block)
+ def queue_with_priority(priority = nil, &block)
if block_given?
self.priority = block
else
@@ -27,9 +29,7 @@ module ActiveJob
end
included do
- class_attribute :priority, instance_accessor: false
-
- self.priority = default_priority
+ class_attribute :priority, instance_accessor: false, default: default_priority
end
# Returns the priority that the job will be created with
@@ -39,6 +39,5 @@ module ActiveJob
end
@priority
end
-
end
end
diff --git a/activejob/lib/active_job/railtie.rb b/activejob/lib/active_job/railtie.rb
index 6538ac1b30..ecc0908d5f 100644
--- a/activejob/lib/active_job/railtie.rb
+++ b/activejob/lib/active_job/railtie.rb
@@ -1,23 +1,49 @@
-require 'global_id/railtie'
-require 'active_job'
+# frozen_string_literal: true
+
+require "global_id/railtie"
+require "active_job"
module ActiveJob
# = Active Job Railtie
class Railtie < Rails::Railtie # :nodoc:
config.active_job = ActiveSupport::OrderedOptions.new
+ config.active_job.custom_serializers = []
- initializer 'active_job.logger' do
+ initializer "active_job.logger" do
ActiveSupport.on_load(:active_job) { self.logger = ::Rails.logger }
end
+ initializer "active_job.custom_serializers" do |app|
+ config.after_initialize do
+ custom_serializers = app.config.active_job.delete(:custom_serializers)
+ ActiveJob::Serializers.add_serializers custom_serializers
+ end
+ end
+
initializer "active_job.set_configs" do |app|
options = app.config.active_job
- options.queue_adapter ||= :inline
+ options.queue_adapter ||= :async
ActiveSupport.on_load(:active_job) do
- options.each { |k,v| send("#{k}=", v) }
+ options.each do |k, v|
+ k = "#{k}="
+ send(k, v) if respond_to? k
+ end
+ end
+
+ ActiveSupport.on_load(:action_dispatch_integration_test) do
+ include ActiveJob::TestHelper
end
end
+ initializer "active_job.set_reloader_hook" do |app|
+ ActiveSupport.on_load(:active_job) do
+ ActiveJob::Callbacks.singleton_class.set_callback(:execute, :around, prepend: true) do |_, inner|
+ app.reloader.wrap do
+ inner.call
+ end
+ end
+ end
+ end
end
end
diff --git a/activejob/lib/active_job/serializers.rb b/activejob/lib/active_job/serializers.rb
new file mode 100644
index 0000000000..a5d90f48b8
--- /dev/null
+++ b/activejob/lib/active_job/serializers.rb
@@ -0,0 +1,63 @@
+# frozen_string_literal: true
+
+require "set"
+
+module ActiveJob
+ # The <tt>ActiveJob::Serializers</tt> module is used to store a list of known serializers
+ # and to add new ones. It also has helpers to serialize/deserialize objects.
+ module Serializers # :nodoc:
+ extend ActiveSupport::Autoload
+
+ autoload :ObjectSerializer
+ autoload :SymbolSerializer
+ autoload :DurationSerializer
+ autoload :DateTimeSerializer
+ autoload :DateSerializer
+ autoload :TimeWithZoneSerializer
+ autoload :TimeSerializer
+
+ mattr_accessor :_additional_serializers
+ self._additional_serializers = Set.new
+
+ class << self
+ # Returns serialized representative of the passed object.
+ # Will look up through all known serializers.
+ # Raises <tt>ActiveJob::SerializationError</tt> if it can't find a proper serializer.
+ def serialize(argument)
+ serializer = serializers.detect { |s| s.serialize?(argument) }
+ raise SerializationError.new("Unsupported argument type: #{argument.class.name}") unless serializer
+ serializer.serialize(argument)
+ end
+
+ # Returns deserialized object.
+ # Will look up through all known serializers.
+ # If no serializer found will raise <tt>ArgumentError</tt>.
+ def deserialize(argument)
+ serializer_name = argument[Arguments::OBJECT_SERIALIZER_KEY]
+ raise ArgumentError, "Serializer name is not present in the argument: #{argument.inspect}" unless serializer_name
+
+ serializer = serializer_name.safe_constantize
+ raise ArgumentError, "Serializer #{serializer_name} is not known" unless serializer
+
+ serializer.deserialize(argument)
+ end
+
+ # Returns list of known serializers.
+ def serializers
+ self._additional_serializers
+ end
+
+ # Adds new serializers to a list of known serializers.
+ def add_serializers(*new_serializers)
+ self._additional_serializers += new_serializers.flatten
+ end
+ end
+
+ add_serializers SymbolSerializer,
+ DurationSerializer,
+ DateTimeSerializer,
+ DateSerializer,
+ TimeWithZoneSerializer,
+ TimeSerializer
+ end
+end
diff --git a/activejob/lib/active_job/serializers/date_serializer.rb b/activejob/lib/active_job/serializers/date_serializer.rb
new file mode 100644
index 0000000000..e995d30faa
--- /dev/null
+++ b/activejob/lib/active_job/serializers/date_serializer.rb
@@ -0,0 +1,21 @@
+# frozen_string_literal: true
+
+module ActiveJob
+ module Serializers
+ class DateSerializer < ObjectSerializer # :nodoc:
+ def serialize(date)
+ super("value" => date.iso8601)
+ end
+
+ def deserialize(hash)
+ Date.iso8601(hash["value"])
+ end
+
+ private
+
+ def klass
+ Date
+ end
+ end
+ end
+end
diff --git a/activejob/lib/active_job/serializers/date_time_serializer.rb b/activejob/lib/active_job/serializers/date_time_serializer.rb
new file mode 100644
index 0000000000..fe780a1978
--- /dev/null
+++ b/activejob/lib/active_job/serializers/date_time_serializer.rb
@@ -0,0 +1,21 @@
+# frozen_string_literal: true
+
+module ActiveJob
+ module Serializers
+ class DateTimeSerializer < ObjectSerializer # :nodoc:
+ def serialize(time)
+ super("value" => time.iso8601)
+ end
+
+ def deserialize(hash)
+ DateTime.iso8601(hash["value"])
+ end
+
+ private
+
+ def klass
+ DateTime
+ end
+ end
+ end
+end
diff --git a/activejob/lib/active_job/serializers/duration_serializer.rb b/activejob/lib/active_job/serializers/duration_serializer.rb
new file mode 100644
index 0000000000..715fe27a5c
--- /dev/null
+++ b/activejob/lib/active_job/serializers/duration_serializer.rb
@@ -0,0 +1,24 @@
+# frozen_string_literal: true
+
+module ActiveJob
+ module Serializers
+ class DurationSerializer < ObjectSerializer # :nodoc:
+ def serialize(duration)
+ super("value" => duration.value, "parts" => Arguments.serialize(duration.parts))
+ end
+
+ def deserialize(hash)
+ value = hash["value"]
+ parts = Arguments.deserialize(hash["parts"])
+
+ klass.new(value, parts)
+ end
+
+ private
+
+ def klass
+ ActiveSupport::Duration
+ end
+ end
+ end
+end
diff --git a/activejob/lib/active_job/serializers/object_serializer.rb b/activejob/lib/active_job/serializers/object_serializer.rb
new file mode 100644
index 0000000000..6d280969be
--- /dev/null
+++ b/activejob/lib/active_job/serializers/object_serializer.rb
@@ -0,0 +1,54 @@
+# frozen_string_literal: true
+
+module ActiveJob
+ module Serializers
+ # Base class for serializing and deserializing custom objects.
+ #
+ # Example:
+ #
+ # class MoneySerializer < ActiveJob::Serializers::ObjectSerializer
+ # def serialize(money)
+ # super("amount" => money.amount, "currency" => money.currency)
+ # end
+ #
+ # def deserialize(hash)
+ # Money.new(hash["amount"], hash["currency"])
+ # end
+ #
+ # private
+ #
+ # def klass
+ # Money
+ # end
+ # end
+ class ObjectSerializer
+ include Singleton
+
+ class << self
+ delegate :serialize?, :serialize, :deserialize, to: :instance
+ end
+
+ # Determines if an argument should be serialized by a serializer.
+ def serialize?(argument)
+ argument.is_a?(klass)
+ end
+
+ # Serializes an argument to a JSON primitive type.
+ def serialize(hash)
+ { Arguments::OBJECT_SERIALIZER_KEY => self.class.name }.merge!(hash)
+ end
+
+ # Deserializes an argument from a JSON primitive type.
+ def deserialize(_argument)
+ raise NotImplementedError
+ end
+
+ private
+
+ # The class of the object that will be serialized.
+ def klass # :doc:
+ raise NotImplementedError
+ end
+ end
+ end
+end
diff --git a/activejob/lib/active_job/serializers/symbol_serializer.rb b/activejob/lib/active_job/serializers/symbol_serializer.rb
new file mode 100644
index 0000000000..7e1f9553a2
--- /dev/null
+++ b/activejob/lib/active_job/serializers/symbol_serializer.rb
@@ -0,0 +1,21 @@
+# frozen_string_literal: true
+
+module ActiveJob
+ module Serializers
+ class SymbolSerializer < ObjectSerializer # :nodoc:
+ def serialize(argument)
+ super("value" => argument.to_s)
+ end
+
+ def deserialize(argument)
+ argument["value"].to_sym
+ end
+
+ private
+
+ def klass
+ Symbol
+ end
+ end
+ end
+end
diff --git a/activejob/lib/active_job/serializers/time_serializer.rb b/activejob/lib/active_job/serializers/time_serializer.rb
new file mode 100644
index 0000000000..fe20772f35
--- /dev/null
+++ b/activejob/lib/active_job/serializers/time_serializer.rb
@@ -0,0 +1,21 @@
+# frozen_string_literal: true
+
+module ActiveJob
+ module Serializers
+ class TimeSerializer < ObjectSerializer # :nodoc:
+ def serialize(time)
+ super("value" => time.iso8601)
+ end
+
+ def deserialize(hash)
+ Time.iso8601(hash["value"])
+ end
+
+ private
+
+ def klass
+ Time
+ end
+ end
+ end
+end
diff --git a/activejob/lib/active_job/serializers/time_with_zone_serializer.rb b/activejob/lib/active_job/serializers/time_with_zone_serializer.rb
new file mode 100644
index 0000000000..43017fc75b
--- /dev/null
+++ b/activejob/lib/active_job/serializers/time_with_zone_serializer.rb
@@ -0,0 +1,21 @@
+# frozen_string_literal: true
+
+module ActiveJob
+ module Serializers
+ class TimeWithZoneSerializer < ObjectSerializer # :nodoc:
+ def serialize(time)
+ super("value" => time.iso8601)
+ end
+
+ def deserialize(hash)
+ Time.iso8601(hash["value"]).in_time_zone
+ end
+
+ private
+
+ def klass
+ ActiveSupport::TimeWithZone
+ end
+ end
+ end
+end
diff --git a/activejob/lib/active_job/test_case.rb b/activejob/lib/active_job/test_case.rb
index d894a7b5cd..49cd51bdd0 100644
--- a/activejob/lib/active_job/test_case.rb
+++ b/activejob/lib/active_job/test_case.rb
@@ -1,7 +1,11 @@
-require 'active_support/test_case'
+# frozen_string_literal: true
+
+require "active_support/test_case"
module ActiveJob
class TestCase < ActiveSupport::TestCase
include ActiveJob::TestHelper
+
+ ActiveSupport.run_load_hooks(:active_job_test_case, self)
end
end
diff --git a/activejob/lib/active_job/test_helper.rb b/activejob/lib/active_job/test_helper.rb
index 44ddfa5f69..0deb68d0d2 100644
--- a/activejob/lib/active_job/test_helper.rb
+++ b/activejob/lib/active_job/test_helper.rb
@@ -1,328 +1,633 @@
-require 'active_support/core_ext/class/subclasses'
-require 'active_support/core_ext/hash/keys'
+# frozen_string_literal: true
+
+require "active_support/core_ext/class/subclasses"
+require "active_support/core_ext/hash/keys"
module ActiveJob
# Provides helper methods for testing Active Job
module TestHelper
- extend ActiveSupport::Concern
-
- included do
- def before_setup # :nodoc:
- test_adapter = ActiveJob::QueueAdapters::TestAdapter.new
+ delegate :enqueued_jobs, :enqueued_jobs=,
+ :performed_jobs, :performed_jobs=,
+ to: :queue_adapter
- @old_queue_adapters = (ActiveJob::Base.subclasses << ActiveJob::Base).select do |klass|
- # only override explicitly set adapters, a quirk of `class_attribute`
- klass.singleton_class.public_instance_methods(false).include?(:_queue_adapter)
- end.map do |klass|
- [klass, klass.queue_adapter].tap do
- klass.queue_adapter = test_adapter
- end
- end
+ module TestQueueAdapter
+ extend ActiveSupport::Concern
- clear_enqueued_jobs
- clear_performed_jobs
- super
+ included do
+ class_attribute :_test_adapter, instance_accessor: false, instance_predicate: false
end
- def after_teardown # :nodoc:
- super
- @old_queue_adapters.each do |(klass, adapter)|
- klass.queue_adapter = adapter
+ module ClassMethods
+ def queue_adapter
+ self._test_adapter.nil? ? super : self._test_adapter
+ end
+
+ def disable_test_adapter
+ self._test_adapter = nil
end
- end
- # Asserts that the number of enqueued jobs matches the given number.
- #
- # def test_jobs
- # assert_enqueued_jobs 0
- # HelloJob.perform_later('david')
- # assert_enqueued_jobs 1
- # HelloJob.perform_later('abdelkader')
- # assert_enqueued_jobs 2
- # end
- #
- # If a block is passed, that block should cause the specified number of
- # jobs to be enqueued.
- #
- # def test_jobs_again
- # assert_enqueued_jobs 1 do
- # HelloJob.perform_later('cristian')
- # end
- #
- # assert_enqueued_jobs 2 do
- # HelloJob.perform_later('aaron')
- # HelloJob.perform_later('rafael')
- # end
- # end
- #
- # The number of times a specific job is enqueued can be asserted.
- #
- # def test_logging_job
- # assert_enqueued_jobs 2, only: LoggingJob do
- # LoggingJob.perform_later
- # HelloJob.perform_later('jeremy')
- # end
- # end
- def assert_enqueued_jobs(number, only: nil)
- if block_given?
- original_count = enqueued_jobs_size(only: only)
- yield
- new_count = enqueued_jobs_size(only: only)
- assert_equal number, new_count - original_count, "#{number} jobs expected, but #{new_count - original_count} were enqueued"
- else
- actual_count = enqueued_jobs_size(only: only)
- assert_equal number, actual_count, "#{number} jobs expected, but #{actual_count} were enqueued"
+ def enable_test_adapter(test_adapter)
+ self._test_adapter = test_adapter
end
end
+ end
+
+ ActiveJob::Base.include(TestQueueAdapter)
+
+ def before_setup # :nodoc:
+ test_adapter = queue_adapter_for_test
- # Asserts that no jobs have been enqueued.
- #
- # def test_jobs
- # assert_no_enqueued_jobs
- # HelloJob.perform_later('jeremy')
- # assert_enqueued_jobs 1
- # end
- #
- # If a block is passed, that block should not cause any job to be enqueued.
- #
- # def test_jobs_again
- # assert_no_enqueued_jobs do
- # # No job should be enqueued from this block
- # end
- # end
- #
- # It can be asserted that no jobs of a specific kind are enqueued:
- #
- # def test_no_logging
- # assert_no_enqueued_jobs only: LoggingJob do
- # HelloJob.perform_later('jeremy')
- # end
- # end
- #
- # Note: This assertion is simply a shortcut for:
- #
- # assert_enqueued_jobs 0, &block
- def assert_no_enqueued_jobs(only: nil, &block)
- assert_enqueued_jobs 0, only: only, &block
+ queue_adapter_changed_jobs.each do |klass|
+ klass.enable_test_adapter(test_adapter)
end
- # Asserts that the number of performed jobs matches the given number.
- # If no block is passed, <tt>perform_enqueued_jobs</tt>
- # must be called around the job call.
- #
- # def test_jobs
- # assert_performed_jobs 0
- #
- # perform_enqueued_jobs do
- # HelloJob.perform_later('xavier')
- # end
- # assert_performed_jobs 1
- #
- # perform_enqueued_jobs do
- # HelloJob.perform_later('yves')
- # assert_performed_jobs 2
- # end
- # end
- #
- # If a block is passed, that block should cause the specified number of
- # jobs to be performed.
- #
- # def test_jobs_again
- # assert_performed_jobs 1 do
- # HelloJob.perform_later('robin')
- # end
- #
- # assert_performed_jobs 2 do
- # HelloJob.perform_later('carlos')
- # HelloJob.perform_later('sean')
- # end
- # end
- #
- # The block form supports filtering. If the :only option is specified,
- # then only the listed job(s) will be performed.
- #
- # def test_hello_job
- # assert_performed_jobs 1, only: HelloJob do
- # HelloJob.perform_later('jeremy')
- # LoggingJob.perform_later
- # end
- # end
- #
- # An array may also be specified, to support testing multiple jobs.
- #
- # def test_hello_and_logging_jobs
- # assert_nothing_raised do
- # assert_performed_jobs 2, only: [HelloJob, LoggingJob] do
- # HelloJob.perform_later('jeremy')
- # LoggingJob.perform_later('stewie')
- # RescueJob.perform_later('david')
- # end
- # end
- # end
- def assert_performed_jobs(number, only: nil)
- if block_given?
- original_count = performed_jobs.size
- perform_enqueued_jobs(only: only) { yield }
- new_count = performed_jobs.size
- assert_equal number, new_count - original_count,
- "#{number} jobs expected, but #{new_count - original_count} were performed"
- else
- performed_jobs_size = performed_jobs.size
- assert_equal number, performed_jobs_size, "#{number} jobs expected, but #{performed_jobs_size} were performed"
- end
+ clear_enqueued_jobs
+ clear_performed_jobs
+ super
+ end
+
+ def after_teardown # :nodoc:
+ super
+
+ queue_adapter_changed_jobs.each { |klass| klass.disable_test_adapter }
+ end
+
+ # Specifies the queue adapter to use with all Active Job test helpers.
+ #
+ # Returns an instance of the queue adapter and defaults to
+ # <tt>ActiveJob::QueueAdapters::TestAdapter</tt>.
+ #
+ # Note: The adapter provided by this method must provide some additional
+ # methods from those expected of a standard <tt>ActiveJob::QueueAdapter</tt>
+ # in order to be used with the active job test helpers. Refer to
+ # <tt>ActiveJob::QueueAdapters::TestAdapter</tt>.
+ def queue_adapter_for_test
+ ActiveJob::QueueAdapters::TestAdapter.new
+ end
+
+ # Asserts that the number of enqueued jobs matches the given number.
+ #
+ # def test_jobs
+ # assert_enqueued_jobs 0
+ # HelloJob.perform_later('david')
+ # assert_enqueued_jobs 1
+ # HelloJob.perform_later('abdelkader')
+ # assert_enqueued_jobs 2
+ # end
+ #
+ # If a block is passed, asserts that the block will cause the specified number of
+ # jobs to be enqueued.
+ #
+ # def test_jobs_again
+ # assert_enqueued_jobs 1 do
+ # HelloJob.perform_later('cristian')
+ # end
+ #
+ # assert_enqueued_jobs 2 do
+ # HelloJob.perform_later('aaron')
+ # HelloJob.perform_later('rafael')
+ # end
+ # end
+ #
+ # Asserts the number of times a specific job was enqueued by passing +:only+ option.
+ #
+ # def test_logging_job
+ # assert_enqueued_jobs 1, only: LoggingJob do
+ # LoggingJob.perform_later
+ # HelloJob.perform_later('jeremy')
+ # end
+ # end
+ #
+ # Asserts the number of times a job except specific class was enqueued by passing +:except+ option.
+ #
+ # def test_logging_job
+ # assert_enqueued_jobs 1, except: HelloJob do
+ # LoggingJob.perform_later
+ # HelloJob.perform_later('jeremy')
+ # end
+ # end
+ #
+ # Asserts the number of times a job is enqueued to a specific queue by passing +:queue+ option.
+ #
+ # def test_logging_job
+ # assert_enqueued_jobs 2, queue: 'default' do
+ # LoggingJob.perform_later
+ # HelloJob.perform_later('elfassy')
+ # end
+ # end
+ def assert_enqueued_jobs(number, only: nil, except: nil, queue: nil)
+ if block_given?
+ original_count = enqueued_jobs_with(only: only, except: except, queue: queue)
+
+ yield
+
+ new_count = enqueued_jobs_with(only: only, except: except, queue: queue)
+
+ actual_count = new_count - original_count
+ else
+ actual_count = enqueued_jobs_with(only: only, except: except, queue: queue)
end
- # Asserts that no jobs have been performed.
- #
- # def test_jobs
- # assert_no_performed_jobs
- #
- # perform_enqueued_jobs do
- # HelloJob.perform_later('matthew')
- # assert_performed_jobs 1
- # end
- # end
- #
- # If a block is passed, that block should not cause any job to be performed.
- #
- # def test_jobs_again
- # assert_no_performed_jobs do
- # # No job should be performed from this block
- # end
- # end
- #
- # The block form supports filtering. If the :only option is specified,
- # then only the listed job(s) will be performed.
- #
- # def test_hello_job
- # assert_performed_jobs 1, only: HelloJob do
- # HelloJob.perform_later('jeremy')
- # LoggingJob.perform_later
- # end
- # end
- #
- # An array may also be specified, to support testing multiple jobs.
- #
- # def test_hello_and_logging_jobs
- # assert_nothing_raised do
- # assert_performed_jobs 2, only: [HelloJob, LoggingJob] do
- # HelloJob.perform_later('jeremy')
- # LoggingJob.perform_later('stewie')
- # RescueJob.perform_later('david')
- # end
- # end
- # end
- #
- # Note: This assertion is simply a shortcut for:
- #
- # assert_performed_jobs 0, &block
- def assert_no_performed_jobs(only: nil, &block)
- assert_performed_jobs 0, only: only, &block
+ assert_equal number, actual_count, "#{number} jobs expected, but #{actual_count} were enqueued"
+ end
+
+ # Asserts that no jobs have been enqueued.
+ #
+ # def test_jobs
+ # assert_no_enqueued_jobs
+ # HelloJob.perform_later('jeremy')
+ # assert_enqueued_jobs 1
+ # end
+ #
+ # If a block is passed, asserts that the block will not cause any job to be enqueued.
+ #
+ # def test_jobs_again
+ # assert_no_enqueued_jobs do
+ # # No job should be enqueued from this block
+ # end
+ # end
+ #
+ # Asserts that no jobs of a specific kind are enqueued by passing +:only+ option.
+ #
+ # def test_no_logging
+ # assert_no_enqueued_jobs only: LoggingJob do
+ # HelloJob.perform_later('jeremy')
+ # end
+ # end
+ #
+ # Asserts that no jobs except specific class are enqueued by passing +:except+ option.
+ #
+ # def test_no_logging
+ # assert_no_enqueued_jobs except: HelloJob do
+ # HelloJob.perform_later('jeremy')
+ # end
+ # end
+ #
+ # Asserts that no jobs are enqueued to a specific queue by passing +:queue+ option
+ #
+ # def test_no_logging
+ # assert_no_enqueued_jobs queue: 'default' do
+ # LoggingJob.set(queue: :some_queue).perform_later
+ # end
+ # end
+ #
+ # Note: This assertion is simply a shortcut for:
+ #
+ # assert_enqueued_jobs 0, &block
+ def assert_no_enqueued_jobs(only: nil, except: nil, queue: nil, &block)
+ assert_enqueued_jobs 0, only: only, except: except, queue: queue, &block
+ end
+
+ # Asserts that the number of performed jobs matches the given number.
+ # If no block is passed, <tt>perform_enqueued_jobs</tt>
+ # must be called around or after the job call.
+ #
+ # def test_jobs
+ # assert_performed_jobs 0
+ #
+ # perform_enqueued_jobs do
+ # HelloJob.perform_later('xavier')
+ # end
+ # assert_performed_jobs 1
+ #
+ # HelloJob.perform_later('yves')
+ #
+ # perform_enqueued_jobs
+ #
+ # assert_performed_jobs 2
+ # end
+ #
+ # If a block is passed, asserts that the block will cause the specified number of
+ # jobs to be performed.
+ #
+ # def test_jobs_again
+ # assert_performed_jobs 1 do
+ # HelloJob.perform_later('robin')
+ # end
+ #
+ # assert_performed_jobs 2 do
+ # HelloJob.perform_later('carlos')
+ # HelloJob.perform_later('sean')
+ # end
+ # end
+ #
+ # This method also supports filtering. If the +:only+ option is specified,
+ # then only the listed job(s) will be performed.
+ #
+ # def test_hello_job
+ # assert_performed_jobs 1, only: HelloJob do
+ # HelloJob.perform_later('jeremy')
+ # LoggingJob.perform_later
+ # end
+ # end
+ #
+ # Also if the +:except+ option is specified,
+ # then the job(s) except specific class will be performed.
+ #
+ # def test_hello_job
+ # assert_performed_jobs 1, except: LoggingJob do
+ # HelloJob.perform_later('jeremy')
+ # LoggingJob.perform_later
+ # end
+ # end
+ #
+ # An array may also be specified, to support testing multiple jobs.
+ #
+ # def test_hello_and_logging_jobs
+ # assert_nothing_raised do
+ # assert_performed_jobs 2, only: [HelloJob, LoggingJob] do
+ # HelloJob.perform_later('jeremy')
+ # LoggingJob.perform_later('stewie')
+ # RescueJob.perform_later('david')
+ # end
+ # end
+ # end
+ #
+ # If the +:queue+ option is specified,
+ # then only the job(s) enqueued to a specific queue will be performed.
+ #
+ # def test_assert_performed_jobs_with_queue_option
+ # assert_performed_jobs 1, queue: :some_queue do
+ # HelloJob.set(queue: :some_queue).perform_later("jeremy")
+ # HelloJob.set(queue: :other_queue).perform_later("bogdan")
+ # end
+ # end
+ def assert_performed_jobs(number, only: nil, except: nil, queue: nil, &block)
+ if block_given?
+ original_count = performed_jobs.size
+
+ perform_enqueued_jobs(only: only, except: except, queue: queue, &block)
+
+ new_count = performed_jobs.size
+
+ performed_jobs_size = new_count - original_count
+ else
+ performed_jobs_size = performed_jobs_with(only: only, except: except, queue: queue)
end
- # Asserts that the job passed in the block has been enqueued with the given arguments.
- #
- # def test_assert_enqueued_with
- # assert_enqueued_with(job: MyJob, args: [1,2,3], queue: 'low') do
- # MyJob.perform_later(1,2,3)
- # end
- #
- # assert_enqueued_with(job: MyJob, at: Date.tomorrow.noon) do
- # MyJob.set(wait_until: Date.tomorrow.noon).perform_later
- # end
- # end
- def assert_enqueued_with(args = {})
+ assert_equal number, performed_jobs_size, "#{number} jobs expected, but #{performed_jobs_size} were performed"
+ end
+
+ # Asserts that no jobs have been performed.
+ #
+ # def test_jobs
+ # assert_no_performed_jobs
+ #
+ # perform_enqueued_jobs do
+ # HelloJob.perform_later('matthew')
+ # assert_performed_jobs 1
+ # end
+ # end
+ #
+ # If a block is passed, asserts that the block will not cause any job to be performed.
+ #
+ # def test_jobs_again
+ # assert_no_performed_jobs do
+ # # No job should be performed from this block
+ # end
+ # end
+ #
+ # The block form supports filtering. If the +:only+ option is specified,
+ # then only the listed job(s) will not be performed.
+ #
+ # def test_no_logging
+ # assert_no_performed_jobs only: LoggingJob do
+ # HelloJob.perform_later('jeremy')
+ # end
+ # end
+ #
+ # Also if the +:except+ option is specified,
+ # then the job(s) except specific class will not be performed.
+ #
+ # def test_no_logging
+ # assert_no_performed_jobs except: HelloJob do
+ # HelloJob.perform_later('jeremy')
+ # end
+ # end
+ #
+ # If the +:queue+ option is specified,
+ # then only the job(s) enqueued to a specific queue will not be performed.
+ #
+ # def test_assert_no_performed_jobs_with_queue_option
+ # assert_no_performed_jobs queue: :some_queue do
+ # HelloJob.set(queue: :other_queue).perform_later("jeremy")
+ # end
+ # end
+ #
+ # Note: This assertion is simply a shortcut for:
+ #
+ # assert_performed_jobs 0, &block
+ def assert_no_performed_jobs(only: nil, except: nil, queue: nil, &block)
+ assert_performed_jobs 0, only: only, except: except, queue: queue, &block
+ end
+
+ # Asserts that the job has been enqueued with the given arguments.
+ #
+ # def test_assert_enqueued_with
+ # MyJob.perform_later(1,2,3)
+ # assert_enqueued_with(job: MyJob, args: [1,2,3], queue: 'low')
+ #
+ # MyJob.set(wait_until: Date.tomorrow.noon).perform_later
+ # assert_enqueued_with(job: MyJob, at: Date.tomorrow.noon)
+ # end
+ #
+ #
+ # The +args+ argument also accepts a proc which will get passed the actual
+ # job's arguments. Your proc needs to returns a boolean value determining if
+ # the job's arguments matches your expectation. This is useful to check only
+ # for a subset of arguments.
+ #
+ # def test_assert_enqueued_with
+ # expected_args = ->(job_args) do
+ # assert job_args.first.key?(:foo)
+ # end
+ #
+ # MyJob.perform_later(foo: 'bar', other_arg: 'No need to check in the test')
+ # assert_enqueued_with(job: MyJob, args: expected_args, queue: 'low')
+ # end
+ #
+ #
+ # If a block is passed, asserts that the block will cause the job to be
+ # enqueued with the given arguments.
+ #
+ # def test_assert_enqueued_with
+ # assert_enqueued_with(job: MyJob, args: [1,2,3], queue: 'low') do
+ # MyJob.perform_later(1,2,3)
+ # end
+ #
+ # assert_enqueued_with(job: MyJob, at: Date.tomorrow.noon) do
+ # MyJob.set(wait_until: Date.tomorrow.noon).perform_later
+ # end
+ # end
+ def assert_enqueued_with(job: nil, args: nil, at: nil, queue: nil)
+ expected = { job: job, args: args, at: at, queue: queue }.compact
+ expected_args = prepare_args_for_assertion(expected)
+
+ if block_given?
original_enqueued_jobs_count = enqueued_jobs.count
- args.assert_valid_keys(:job, :args, :at, :queue)
- serialized_args = serialize_args_for_assertion(args)
+
yield
- in_block_jobs = enqueued_jobs.drop(original_enqueued_jobs_count)
- matching_job = in_block_jobs.find do |job|
- serialized_args.all? { |key, value| value == job[key] }
+
+ jobs = enqueued_jobs.drop(original_enqueued_jobs_count)
+ else
+ jobs = enqueued_jobs
+ end
+
+ matching_job = jobs.find do |enqueued_job|
+ deserialized_job = deserialize_args_for_assertion(enqueued_job)
+
+ expected_args.all? do |key, value|
+ if value.respond_to?(:call)
+ value.call(deserialized_job[key])
+ else
+ value == deserialized_job[key]
+ end
end
- assert matching_job, "No enqueued job found with #{args}"
- instantiate_job(matching_job)
end
- # Asserts that the job passed in the block has been performed with the given arguments.
- #
- # def test_assert_performed_with
- # assert_performed_with(job: MyJob, args: [1,2,3], queue: 'high') do
- # MyJob.perform_later(1,2,3)
- # end
- #
- # assert_performed_with(job: MyJob, at: Date.tomorrow.noon) do
- # MyJob.set(wait_until: Date.tomorrow.noon).perform_later
- # end
- # end
- def assert_performed_with(args = {})
+ assert matching_job, "No enqueued job found with #{expected}"
+ instantiate_job(matching_job)
+ end
+
+ # Asserts that the job has been performed with the given arguments.
+ #
+ # def test_assert_performed_with
+ # MyJob.perform_later(1,2,3)
+ #
+ # perform_enqueued_jobs
+ #
+ # assert_performed_with(job: MyJob, args: [1,2,3], queue: 'high')
+ #
+ # MyJob.set(wait_until: Date.tomorrow.noon).perform_later
+ #
+ # perform_enqueued_jobs
+ #
+ # assert_performed_with(job: MyJob, at: Date.tomorrow.noon)
+ # end
+ #
+ # The +args+ argument also accepts a proc which will get passed the actual
+ # job's arguments. Your proc needs to returns a boolean value determining if
+ # the job's arguments matches your expectation. This is useful to check only
+ # for a subset of arguments.
+ #
+ # def test_assert_performed_with
+ # expected_args = ->(job_args) do
+ # assert job_args.first.key?(:foo)
+ # end
+ # MyJob.perform_later(foo: 'bar', other_arg: 'No need to check in the test')
+ #
+ # perform_enqueued_jobs
+ #
+ # assert_performed_with(job: MyJob, args: expected_args, queue: 'high')
+ # end
+ #
+ # If a block is passed, that block performs all of the jobs that were
+ # enqueued throughout the duration of the block and asserts that
+ # the job has been performed with the given arguments in the block.
+ #
+ # def test_assert_performed_with
+ # assert_performed_with(job: MyJob, args: [1,2,3], queue: 'high') do
+ # MyJob.perform_later(1,2,3)
+ # end
+ #
+ # assert_performed_with(job: MyJob, at: Date.tomorrow.noon) do
+ # MyJob.set(wait_until: Date.tomorrow.noon).perform_later
+ # end
+ # end
+ def assert_performed_with(job: nil, args: nil, at: nil, queue: nil, &block)
+ expected = { job: job, args: args, at: at, queue: queue }.compact
+ expected_args = prepare_args_for_assertion(expected)
+
+ if block_given?
original_performed_jobs_count = performed_jobs.count
- args.assert_valid_keys(:job, :args, :at, :queue)
- serialized_args = serialize_args_for_assertion(args)
- perform_enqueued_jobs { yield }
- in_block_jobs = performed_jobs.drop(original_performed_jobs_count)
- matching_job = in_block_jobs.find do |job|
- serialized_args.all? { |key, value| value == job[key] }
- end
- assert matching_job, "No performed job found with #{args}"
- instantiate_job(matching_job)
+
+ perform_enqueued_jobs(&block)
+
+ jobs = performed_jobs.drop(original_performed_jobs_count)
+ else
+ jobs = performed_jobs
end
- def perform_enqueued_jobs(only: nil)
- old_perform_enqueued_jobs = queue_adapter.perform_enqueued_jobs
- old_perform_enqueued_at_jobs = queue_adapter.perform_enqueued_at_jobs
- old_filter = queue_adapter.filter
-
- begin
- queue_adapter.perform_enqueued_jobs = true
- queue_adapter.perform_enqueued_at_jobs = true
- queue_adapter.filter = only
- yield
- ensure
- queue_adapter.perform_enqueued_jobs = old_perform_enqueued_jobs
- queue_adapter.perform_enqueued_at_jobs = old_perform_enqueued_at_jobs
- queue_adapter.filter = old_filter
+ matching_job = jobs.find do |enqueued_job|
+ deserialized_job = deserialize_args_for_assertion(enqueued_job)
+
+ expected_args.all? do |key, value|
+ if value.respond_to?(:call)
+ value.call(deserialized_job[key])
+ else
+ value == deserialized_job[key]
+ end
end
end
- def queue_adapter
- ActiveJob::Base.queue_adapter
+ assert matching_job, "No performed job found with #{expected}"
+ instantiate_job(matching_job)
+ end
+
+ # Performs all enqueued jobs. If a block is given, performs all of the jobs
+ # that were enqueued throughout the duration of the block. If a block is
+ # not given, performs all of the enqueued jobs up to this point in the test.
+ #
+ # def test_perform_enqueued_jobs
+ # perform_enqueued_jobs do
+ # MyJob.perform_later(1, 2, 3)
+ # end
+ # assert_performed_jobs 1
+ # end
+ #
+ # def test_perform_enqueued_jobs_without_block
+ # MyJob.perform_later(1, 2, 3)
+ #
+ # perform_enqueued_jobs
+ #
+ # assert_performed_jobs 1
+ # end
+ #
+ # This method also supports filtering. If the +:only+ option is specified,
+ # then only the listed job(s) will be performed.
+ #
+ # def test_perform_enqueued_jobs_with_only
+ # perform_enqueued_jobs(only: MyJob) do
+ # MyJob.perform_later(1, 2, 3) # will be performed
+ # HelloJob.perform_later(1, 2, 3) # will not be performed
+ # end
+ # assert_performed_jobs 1
+ # end
+ #
+ # Also if the +:except+ option is specified,
+ # then the job(s) except specific class will be performed.
+ #
+ # def test_perform_enqueued_jobs_with_except
+ # perform_enqueued_jobs(except: HelloJob) do
+ # MyJob.perform_later(1, 2, 3) # will be performed
+ # HelloJob.perform_later(1, 2, 3) # will not be performed
+ # end
+ # assert_performed_jobs 1
+ # end
+ #
+ # If the +:queue+ option is specified,
+ # then only the job(s) enqueued to a specific queue will be performed.
+ #
+ # def test_perform_enqueued_jobs_with_queue
+ # perform_enqueued_jobs queue: :some_queue do
+ # MyJob.set(queue: :some_queue).perform_later(1, 2, 3) # will be performed
+ # HelloJob.set(queue: :other_queue).perform_later(1, 2, 3) # will not be performed
+ # end
+ # assert_performed_jobs 1
+ # end
+ #
+ def perform_enqueued_jobs(only: nil, except: nil, queue: nil)
+ return flush_enqueued_jobs(only: only, except: except, queue: queue) unless block_given?
+
+ validate_option(only: only, except: except)
+
+ old_perform_enqueued_jobs = queue_adapter.perform_enqueued_jobs
+ old_perform_enqueued_at_jobs = queue_adapter.perform_enqueued_at_jobs
+ old_filter = queue_adapter.filter
+ old_reject = queue_adapter.reject
+ old_queue = queue_adapter.queue
+
+ begin
+ queue_adapter.perform_enqueued_jobs = true
+ queue_adapter.perform_enqueued_at_jobs = true
+ queue_adapter.filter = only
+ queue_adapter.reject = except
+ queue_adapter.queue = queue
+
+ yield
+ ensure
+ queue_adapter.perform_enqueued_jobs = old_perform_enqueued_jobs
+ queue_adapter.perform_enqueued_at_jobs = old_perform_enqueued_at_jobs
+ queue_adapter.filter = old_filter
+ queue_adapter.reject = old_reject
+ queue_adapter.queue = old_queue
end
+ end
- delegate :enqueued_jobs, :enqueued_jobs=,
- :performed_jobs, :performed_jobs=,
- to: :queue_adapter
+ # Accesses the queue_adapter set by ActiveJob::Base.
+ #
+ # def test_assert_job_has_custom_queue_adapter_set
+ # assert_instance_of CustomQueueAdapter, HelloJob.queue_adapter
+ # end
+ def queue_adapter
+ ActiveJob::Base.queue_adapter
+ end
- private
- def clear_enqueued_jobs # :nodoc:
- enqueued_jobs.clear
- end
+ private
+ def clear_enqueued_jobs
+ enqueued_jobs.clear
+ end
- def clear_performed_jobs # :nodoc:
- performed_jobs.clear
- end
+ def clear_performed_jobs
+ performed_jobs.clear
+ end
+
+ def jobs_with(jobs, only: nil, except: nil, queue: nil)
+ validate_option(only: only, except: except)
+
+ jobs.count do |job|
+ job_class = job.fetch(:job)
- def enqueued_jobs_size(only: nil) # :nodoc:
if only
- enqueued_jobs.count { |job| Array(only).include?(job.fetch(:job)) }
- else
- enqueued_jobs.count
+ next false unless Array(only).include?(job_class)
+ elsif except
+ next false if Array(except).include?(job_class)
end
- end
- def serialize_args_for_assertion(args) # :nodoc:
- args.dup.tap do |serialized_args|
- serialized_args[:args] = ActiveJob::Arguments.serialize(serialized_args[:args]) if serialized_args[:args]
- serialized_args[:at] = serialized_args[:at].to_f if serialized_args[:at]
+ if queue
+ next false unless queue.to_s == job.fetch(:queue, job_class.queue_name)
end
+
+ yield job if block_given?
+
+ true
+ end
+ end
+
+ def enqueued_jobs_with(only: nil, except: nil, queue: nil, &block)
+ jobs_with(enqueued_jobs, only: only, except: except, queue: queue, &block)
+ end
+
+ def performed_jobs_with(only: nil, except: nil, queue: nil, &block)
+ jobs_with(performed_jobs, only: only, except: except, queue: queue, &block)
+ end
+
+ def flush_enqueued_jobs(only: nil, except: nil, queue: nil)
+ enqueued_jobs_with(only: only, except: except, queue: queue) do |payload|
+ instantiate_job(payload).perform_now
+ queue_adapter.performed_jobs << payload
end
+ end
- def instantiate_job(payload) # :nodoc:
- job = payload[:job].new(*payload[:args])
- job.scheduled_at = Time.at(payload[:at]) if payload.key?(:at)
- job.queue_name = payload[:queue]
- job
+ def prepare_args_for_assertion(args)
+ args.dup.tap do |arguments|
+ arguments[:at] = arguments[:at].to_f if arguments[:at]
end
- end
+ end
+
+ def deserialize_args_for_assertion(job)
+ job.dup.tap do |new_job|
+ new_job[:args] = ActiveJob::Arguments.deserialize(new_job[:args]) if new_job[:args]
+ end
+ end
+
+ def instantiate_job(payload)
+ args = ActiveJob::Arguments.deserialize(payload[:args])
+ job = payload[:job].new(*args)
+ job.scheduled_at = Time.at(payload[:at]) if payload.key?(:at)
+ job.queue_name = payload[:queue]
+ job
+ end
+
+ def queue_adapter_changed_jobs
+ (ActiveJob::Base.descendants << ActiveJob::Base).select do |klass|
+ # only override explicitly set adapters, a quirk of `class_attribute`
+ klass.singleton_class.public_instance_methods(false).include?(:_queue_adapter)
+ end
+ end
+
+ def validate_option(only: nil, except: nil)
+ raise ArgumentError, "Cannot specify both `:only` and `:except` options." if only && except
+ end
end
end
diff --git a/activejob/lib/active_job/timezones.rb b/activejob/lib/active_job/timezones.rb
new file mode 100644
index 0000000000..ac018eb752
--- /dev/null
+++ b/activejob/lib/active_job/timezones.rb
@@ -0,0 +1,13 @@
+# frozen_string_literal: true
+
+module ActiveJob
+ module Timezones #:nodoc:
+ extend ActiveSupport::Concern
+
+ included do
+ around_perform do |job, block|
+ Time.use_zone(job.timezone, &block)
+ end
+ end
+ end
+end
diff --git a/activejob/lib/active_job/translation.rb b/activejob/lib/active_job/translation.rb
index 67e4cf4ab9..0fd9b9fc06 100644
--- a/activejob/lib/active_job/translation.rb
+++ b/activejob/lib/active_job/translation.rb
@@ -1,9 +1,11 @@
+# frozen_string_literal: true
+
module ActiveJob
module Translation #:nodoc:
extend ActiveSupport::Concern
included do
- around_perform do |job, block, _|
+ around_perform do |job, block|
I18n.with_locale(job.locale, &block)
end
end
diff --git a/activejob/lib/active_job/version.rb b/activejob/lib/active_job/version.rb
index 971ba9fe0c..eae7da4d05 100644
--- a/activejob/lib/active_job/version.rb
+++ b/activejob/lib/active_job/version.rb
@@ -1,4 +1,6 @@
-require_relative 'gem_version'
+# frozen_string_literal: true
+
+require_relative "gem_version"
module ActiveJob
# Returns the version of the currently loaded Active Job as a <tt>Gem::Version</tt>
diff --git a/activejob/lib/rails/generators/job/job_generator.rb b/activejob/lib/rails/generators/job/job_generator.rb
index 2115fb9f71..03346a7f12 100644
--- a/activejob/lib/rails/generators/job/job_generator.rb
+++ b/activejob/lib/rails/generators/job/job_generator.rb
@@ -1,23 +1,44 @@
-require 'rails/generators/named_base'
+# frozen_string_literal: true
+
+require "rails/generators/named_base"
module Rails # :nodoc:
module Generators # :nodoc:
class JobGenerator < Rails::Generators::NamedBase # :nodoc:
- desc 'This generator creates an active job file at app/jobs'
+ desc "This generator creates an active job file at app/jobs"
- class_option :queue, type: :string, default: 'default', desc: 'The queue name for the generated job'
+ class_option :queue, type: :string, default: "default", desc: "The queue name for the generated job"
- check_class_collision suffix: 'Job'
+ check_class_collision suffix: "Job"
hook_for :test_framework
def self.default_generator_root
- File.dirname(__FILE__)
+ __dir__
end
def create_job_file
- template 'job.rb', File.join('app/jobs', class_path, "#{file_name}_job.rb")
+ template "job.rb", File.join("app/jobs", class_path, "#{file_name}_job.rb")
+
+ in_root do
+ if behavior == :invoke && !File.exist?(application_job_file_name)
+ template "application_job.rb", application_job_file_name
+ end
+ end
end
+
+ private
+ def file_name
+ @_file_name ||= super.sub(/_job\z/i, "")
+ end
+
+ def application_job_file_name
+ @application_job_file_name ||= if mountable_engine?
+ "app/jobs/#{namespaced_path}/application_job.rb"
+ else
+ "app/jobs/application_job.rb"
+ end
+ end
end
end
end
diff --git a/activejob/lib/rails/generators/job/templates/application_job.rb.tt b/activejob/lib/rails/generators/job/templates/application_job.rb.tt
new file mode 100644
index 0000000000..f93745a31a
--- /dev/null
+++ b/activejob/lib/rails/generators/job/templates/application_job.rb.tt
@@ -0,0 +1,9 @@
+<% module_namespacing do -%>
+class ApplicationJob < ActiveJob::Base
+ # Automatically retry jobs that encountered a deadlock
+ # retry_on ActiveRecord::Deadlocked
+
+ # Most jobs are safe to ignore if the underlying records are no longer available
+ # discard_on ActiveJob::DeserializationError
+end
+<% end -%>
diff --git a/activejob/lib/rails/generators/job/templates/job.rb b/activejob/lib/rails/generators/job/templates/job.rb.tt
index 4ad2914a45..4ad2914a45 100644
--- a/activejob/lib/rails/generators/job/templates/job.rb
+++ b/activejob/lib/rails/generators/job/templates/job.rb.tt