aboutsummaryrefslogtreecommitdiffstats
path: root/activejob/lib/active_job
diff options
context:
space:
mode:
Diffstat (limited to 'activejob/lib/active_job')
-rw-r--r--activejob/lib/active_job/arguments.rb21
-rw-r--r--activejob/lib/active_job/async_job.rb74
-rw-r--r--activejob/lib/active_job/base.rb4
-rw-r--r--activejob/lib/active_job/core.rb15
-rw-r--r--activejob/lib/active_job/enqueuing.rb4
-rw-r--r--activejob/lib/active_job/logging.rb18
-rw-r--r--activejob/lib/active_job/queue_adapter.rb6
-rw-r--r--activejob/lib/active_job/queue_adapters.rb42
-rw-r--r--activejob/lib/active_job/queue_adapters/async_adapter.rb23
-rw-r--r--activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb4
-rw-r--r--activejob/lib/active_job/queue_adapters/que_adapter.rb4
-rw-r--r--activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb8
-rw-r--r--activejob/lib/active_job/queue_adapters/sneakers_adapter.rb2
-rw-r--r--activejob/lib/active_job/queue_name.rb2
-rw-r--r--activejob/lib/active_job/queue_priority.rb44
-rw-r--r--activejob/lib/active_job/test_helper.rb60
-rw-r--r--activejob/lib/active_job/translation.rb11
17 files changed, 283 insertions, 59 deletions
diff --git a/activejob/lib/active_job/arguments.rb b/activejob/lib/active_job/arguments.rb
index cdb37879b6..8e462bfe5d 100644
--- a/activejob/lib/active_job/arguments.rb
+++ b/activejob/lib/active_job/arguments.rb
@@ -16,13 +16,13 @@ module ActiveJob
end
end
- # Raised when an unsupported argument type is being set as job argument. We
+ # Raised when an unsupported argument type is set as a job argument. We
# currently support NilClass, Fixnum, Float, String, TrueClass, FalseClass,
- # Bignum and object that can be represented as GlobalIDs (ex: Active Record).
- # Also raised if you set the key for a Hash something else than a string or
- # a symbol.
- class SerializationError < ArgumentError
- end
+ # Bignum and objects that can be represented as GlobalIDs (ex: Active Record).
+ # Raised if you set the key for a Hash something else than a string or
+ # a symbol. Also raised when trying to serialize an object which can't be
+ # identified with a Global ID - such as an unpersisted Active Record model.
+ class SerializationError < ArgumentError; end
module Arguments
extend self
@@ -59,7 +59,7 @@ module ActiveJob
when *TYPE_WHITELIST
argument
when GlobalID::Identification
- { GLOBALID_KEY => argument.to_global_id.to_s }
+ convert_to_global_id_hash(argument)
when Array
argument.map { |arg| serialize_argument(arg) }
when ActiveSupport::HashWithIndifferentAccess
@@ -147,5 +147,12 @@ module ActiveJob
end
end
end
+
+ def convert_to_global_id_hash(argument)
+ { GLOBALID_KEY => argument.to_global_id.to_s }
+ rescue URI::GID::MissingModelIdError
+ raise SerializationError, "Unable to serialize #{argument.class} " \
+ "without an id. (Maybe you forgot to call save?)"
+ end
end
end
diff --git a/activejob/lib/active_job/async_job.rb b/activejob/lib/active_job/async_job.rb
new file mode 100644
index 0000000000..6c1c070994
--- /dev/null
+++ b/activejob/lib/active_job/async_job.rb
@@ -0,0 +1,74 @@
+require 'concurrent'
+
+module ActiveJob
+ # == Active Job Async Job
+ #
+ # When enqueueing jobs with Async Job each job will be executed asynchronously
+ # on a +concurrent-ruby+ thread pool. All job data is retained in memory.
+ # Because job data is not saved to a persistent datastore there is no
+ # additional infrastructure needed and jobs process quickly. The lack of
+ # persistence, however, means that all unprocessed jobs will be lost on
+ # application restart. Therefore in-memory queue adapters are unsuitable for
+ # most production environments but are excellent for development and testing.
+ #
+ # Read more about Concurrent Ruby {here}[https://github.com/ruby-concurrency/concurrent-ruby].
+ #
+ # To use Async Job set the queue_adapter config to +:async+.
+ #
+ # Rails.application.config.active_job.queue_adapter = :async
+ #
+ # Async Job supports job queues specified with +queue_as+. Queues are created
+ # automatically as needed and each has its own thread pool.
+ class AsyncJob
+
+ DEFAULT_EXECUTOR_OPTIONS = {
+ min_threads: [2, Concurrent.processor_count].max,
+ max_threads: Concurrent.processor_count * 10,
+ auto_terminate: true,
+ idletime: 60, # 1 minute
+ max_queue: 0, # unlimited
+ fallback_policy: :caller_runs # shouldn't matter -- 0 max queue
+ }.freeze
+
+ QUEUES = Concurrent::Map.new do |hash, queue_name| #:nodoc:
+ hash.compute_if_absent(queue_name) { ActiveJob::AsyncJob.create_thread_pool }
+ end
+
+ class << self
+ # Forces jobs to process immediately when testing the Active Job gem.
+ # This should only be called from within unit tests.
+ def perform_immediately! #:nodoc:
+ @perform_immediately = true
+ end
+
+ # Allows jobs to run asynchronously when testing the Active Job gem.
+ # This should only be called from within unit tests.
+ def perform_asynchronously! #:nodoc:
+ @perform_immediately = false
+ end
+
+ def create_thread_pool #:nodoc:
+ if @perform_immediately
+ Concurrent::ImmediateExecutor.new
+ else
+ Concurrent::ThreadPoolExecutor.new(DEFAULT_EXECUTOR_OPTIONS)
+ end
+ end
+
+ def enqueue(job_data, queue: 'default') #:nodoc:
+ QUEUES[queue].post(job_data) { |job| ActiveJob::Base.execute(job) }
+ end
+
+ def enqueue_at(job_data, timestamp, queue: 'default') #:nodoc:
+ delay = timestamp - Time.current.to_f
+ if delay > 0
+ Concurrent::ScheduledTask.execute(delay, args: [job_data], executor: QUEUES[queue]) do |job|
+ ActiveJob::Base.execute(job)
+ end
+ else
+ enqueue(job_data, queue: queue)
+ end
+ end
+ end
+ end
+end
diff --git a/activejob/lib/active_job/base.rb b/activejob/lib/active_job/base.rb
index fd49b3fda5..e5f09f65fb 100644
--- a/activejob/lib/active_job/base.rb
+++ b/activejob/lib/active_job/base.rb
@@ -1,10 +1,12 @@
require 'active_job/core'
require 'active_job/queue_adapter'
require 'active_job/queue_name'
+require 'active_job/queue_priority'
require 'active_job/enqueuing'
require 'active_job/execution'
require 'active_job/callbacks'
require 'active_job/logging'
+require 'active_job/translation'
module ActiveJob #:nodoc:
# = Active Job
@@ -56,10 +58,12 @@ module ActiveJob #:nodoc:
include Core
include QueueAdapter
include QueueName
+ include QueuePriority
include Enqueuing
include Execution
include Callbacks
include Logging
+ include Translation
ActiveSupport.run_load_hooks(:active_job, self)
end
diff --git a/activejob/lib/active_job/core.rb b/activejob/lib/active_job/core.rb
index 0528572cd0..19b900a285 100644
--- a/activejob/lib/active_job/core.rb
+++ b/activejob/lib/active_job/core.rb
@@ -18,8 +18,14 @@ module ActiveJob
# Queue in which the job will reside.
attr_writer :queue_name
+ # Priority that the job will have (lower is more priority).
+ attr_writer :priority
+
# ID optionally provided by adapter
attr_accessor :provider_job_id
+
+ # I18n.locale to be used during the job.
+ attr_accessor :locale
end
# These methods will be included into any Active Job object, adding
@@ -40,6 +46,7 @@ module ActiveJob
# * <tt>:wait</tt> - Enqueues the job with the specified delay
# * <tt>:wait_until</tt> - Enqueues the job at the time specified
# * <tt>:queue</tt> - Enqueues the job on the specified queue
+ # * <tt>:priority</tt> - Enqueues the job with the specified priority
#
# ==== Examples
#
@@ -48,6 +55,7 @@ module ActiveJob
# VideoJob.set(wait_until: Time.now.tomorrow).perform_later(Video.last)
# VideoJob.set(queue: :some_queue, wait: 5.minutes).perform_later(Video.last)
# VideoJob.set(queue: :some_queue, wait_until: Time.now.tomorrow).perform_later(Video.last)
+ # VideoJob.set(queue: :some_queue, wait: 5.minutes, priority: 10).perform_later(Video.last)
def set(options={})
ConfiguredJob.new(self, options)
end
@@ -59,6 +67,7 @@ module ActiveJob
@arguments = arguments
@job_id = SecureRandom.uuid
@queue_name = self.class.queue_name
+ @priority = self.class.priority
end
# Returns a hash with the job data that can safely be passed to the
@@ -68,7 +77,9 @@ module ActiveJob
'job_class' => self.class.name,
'job_id' => job_id,
'queue_name' => queue_name,
- 'arguments' => serialize_arguments(arguments)
+ 'priority' => priority,
+ 'arguments' => serialize_arguments(arguments),
+ 'locale' => I18n.locale
}
end
@@ -95,7 +106,9 @@ module ActiveJob
def deserialize(job_data)
self.job_id = job_data['job_id']
self.queue_name = job_data['queue_name']
+ self.priority = job_data['priority']
self.serialized_arguments = job_data['arguments']
+ self.locale = job_data['locale'] || I18n.locale
end
private
diff --git a/activejob/lib/active_job/enqueuing.rb b/activejob/lib/active_job/enqueuing.rb
index 98d92385dd..22154457fd 100644
--- a/activejob/lib/active_job/enqueuing.rb
+++ b/activejob/lib/active_job/enqueuing.rb
@@ -32,6 +32,7 @@ module ActiveJob
# * <tt>:wait</tt> - Enqueues the job with the specified delay
# * <tt>:wait_until</tt> - Enqueues the job at the time specified
# * <tt>:queue</tt> - Enqueues the job on the specified queue
+ # * <tt>:priority</tt> - Enqueues the job with the specified priority
#
# ==== Examples
#
@@ -54,6 +55,7 @@ module ActiveJob
# * <tt>:wait</tt> - Enqueues the job with the specified delay
# * <tt>:wait_until</tt> - Enqueues the job at the time specified
# * <tt>:queue</tt> - Enqueues the job on the specified queue
+ # * <tt>:priority</tt> - Enqueues the job with the specified priority
#
# ==== Examples
#
@@ -61,10 +63,12 @@ module ActiveJob
# my_job_instance.enqueue wait: 5.minutes
# my_job_instance.enqueue queue: :important
# my_job_instance.enqueue wait_until: Date.tomorrow.midnight
+ # my_job_instance.enqueue priority: 10
def enqueue(options={})
self.scheduled_at = options[:wait].seconds.from_now.to_f if options[:wait]
self.scheduled_at = options[:wait_until].to_f if options[:wait_until]
self.queue_name = self.class.queue_name_from_part(options[:queue]) if options[:queue]
+ self.priority = options[:priority].to_i if options[:priority]
run_callbacks :enqueue do
if self.scheduled_at
self.class.queue_adapter.enqueue_at self, self.scheduled_at
diff --git a/activejob/lib/active_job/logging.rb b/activejob/lib/active_job/logging.rb
index 54774db601..605057d1e8 100644
--- a/activejob/lib/active_job/logging.rb
+++ b/activejob/lib/active_job/logging.rb
@@ -1,3 +1,4 @@
+require 'active_support/core_ext/hash/transform_values'
require 'active_support/core_ext/string/filters'
require 'active_support/tagged_logging'
require 'active_support/logger'
@@ -25,7 +26,7 @@ module ActiveJob
end
end
- before_enqueue do |job|
+ after_enqueue do |job|
if job.scheduled_at
ActiveSupport::Notifications.instrument "enqueue_at.active_job",
adapter: job.class.queue_adapter, job: job
@@ -87,12 +88,25 @@ module ActiveJob
def args_info(job)
if job.arguments.any?
' with arguments: ' +
- job.arguments.map { |arg| arg.try(:to_global_id).try(:to_s) || arg.inspect }.join(', ')
+ job.arguments.map { |arg| format(arg).inspect }.join(', ')
else
''
end
end
+ def format(arg)
+ case arg
+ when Hash
+ arg.transform_values { |value| format(value) }
+ when Array
+ arg.map { |value| format(value) }
+ when GlobalID::Identification
+ arg.to_global_id rescue arg
+ else
+ arg
+ end
+ end
+
def scheduled_at(event)
Time.at(event.payload[:job].scheduled_at).utc
end
diff --git a/activejob/lib/active_job/queue_adapter.rb b/activejob/lib/active_job/queue_adapter.rb
index 9c4519432d..457015b741 100644
--- a/activejob/lib/active_job/queue_adapter.rb
+++ b/activejob/lib/active_job/queue_adapter.rb
@@ -4,7 +4,7 @@ require 'active_support/core_ext/string/inflections'
module ActiveJob
# The <tt>ActiveJob::QueueAdapter</tt> module is used to load the
- # correct adapter. The default queue adapter is the :inline queue.
+ # correct adapter. The default queue adapter is the +:inline+ queue.
module QueueAdapter #:nodoc:
extend ActiveSupport::Concern
@@ -15,12 +15,14 @@ module ActiveJob
# Includes the setter method for changing the active queue adapter.
module ClassMethods
+ # Returns the backend queue provider. The default queue adapter
+ # is the +:inline+ queue. See QueueAdapters for more information.
def queue_adapter
_queue_adapter
end
# Specify the backend queue provider. The default queue adapter
- # is the :inline queue. See QueueAdapters for more
+ # is the +:inline+ queue. See QueueAdapters for more
# information.
def queue_adapter=(name_or_adapter_or_class)
self._queue_adapter = interpret_adapter(name_or_adapter_or_class)
diff --git a/activejob/lib/active_job/queue_adapters.rb b/activejob/lib/active_job/queue_adapters.rb
index bd69e525bb..aeb1fe1e73 100644
--- a/activejob/lib/active_job/queue_adapters.rb
+++ b/activejob/lib/active_job/queue_adapters.rb
@@ -12,21 +12,24 @@ module ActiveJob
# * {Sidekiq}[http://sidekiq.org]
# * {Sneakers}[https://github.com/jondot/sneakers]
# * {Sucker Punch}[https://github.com/brandonhilkert/sucker_punch]
+ # * {Active Job Async Job}[http://api.rubyonrails.org/classes/ActiveJob/QueueAdapters/AsyncAdapter.html]
+ # * {Active Job Inline}[http://api.rubyonrails.org/classes/ActiveJob/QueueAdapters/InlineAdapter.html]
#
# === Backends Features
#
- # | | Async | Queues | Delayed | Priorities | Timeout | Retries |
- # |-------------------|-------|--------|-----------|------------|---------|---------|
- # | Backburner | Yes | Yes | Yes | Yes | Job | Global |
- # | Delayed Job | Yes | Yes | Yes | Job | Global | Global |
- # | Qu | Yes | Yes | No | No | No | Global |
- # | Que | Yes | Yes | Yes | Job | No | Job |
- # | queue_classic | Yes | Yes | No* | No | No | No |
- # | Resque | Yes | Yes | Yes (Gem) | Queue | Global | Yes |
- # | Sidekiq | Yes | Yes | Yes | Queue | No | Job |
- # | Sneakers | Yes | Yes | No | Queue | Queue | No |
- # | Sucker Punch | Yes | Yes | No | No | No | No |
- # | Active Job Inline | No | Yes | N/A | N/A | N/A | N/A |
+ # | | Async | Queues | Delayed | Priorities | Timeout | Retries |
+ # |-------------------|-------|--------|------------|------------|---------|---------|
+ # | Backburner | Yes | Yes | Yes | Yes | Job | Global |
+ # | Delayed Job | Yes | Yes | Yes | Job | Global | Global |
+ # | Qu | Yes | Yes | No | No | No | Global |
+ # | Que | Yes | Yes | Yes | Job | No | Job |
+ # | queue_classic | Yes | Yes | Yes* | No | No | No |
+ # | Resque | Yes | Yes | Yes (Gem) | Queue | Global | Yes |
+ # | Sidekiq | Yes | Yes | Yes | Queue | No | Job |
+ # | Sneakers | Yes | Yes | No | Queue | Queue | No |
+ # | Sucker Punch | Yes | Yes | No | No | No | No |
+ # | Active Job Async | Yes | Yes | Yes | No | No | No |
+ # | Active Job Inline | No | Yes | N/A | N/A | N/A | N/A |
#
# ==== Async
#
@@ -50,9 +53,8 @@ module ActiveJob
# N/A: The adapter does not support queueing.
#
# NOTE:
- # queue_classic does not support job scheduling.
- # However, you can use the queue_classic-later gem.
- # See the documentation for ActiveJob::QueueAdapters::QueueClassicAdapter.
+ # queue_classic supports job scheduling since version 3.1.
+ # For older versions you can use the queue_classic-later gem.
#
# ==== Priorities
#
@@ -97,9 +99,15 @@ module ActiveJob
#
# N/A: The adapter does not run in a separate process, and therefore doesn't
# support retries.
+ #
+ # === Async and Inline Queue Adapters
+ #
+ # Active Job has two built-in queue adapters intended for development and
+ # testing: +:async+ and +:inline+.
module QueueAdapters
extend ActiveSupport::Autoload
+ autoload :AsyncAdapter
autoload :InlineAdapter
autoload :BackburnerAdapter
autoload :DelayedJobAdapter
@@ -116,6 +124,10 @@ module ActiveJob
private_constant :ADAPTER
class << self
+ # Returns adapter for specified name.
+ #
+ # ActiveJob::QueueAdapters.lookup(:sidekiq)
+ # # => ActiveJob::QueueAdapters::SidekiqAdapter
def lookup(name)
const_get(name.to_s.camelize << ADAPTER)
end
diff --git a/activejob/lib/active_job/queue_adapters/async_adapter.rb b/activejob/lib/active_job/queue_adapters/async_adapter.rb
new file mode 100644
index 0000000000..3fc27f56e7
--- /dev/null
+++ b/activejob/lib/active_job/queue_adapters/async_adapter.rb
@@ -0,0 +1,23 @@
+require 'active_job/async_job'
+
+module ActiveJob
+ module QueueAdapters
+ # == Active Job Async adapter
+ #
+ # When enqueueing jobs with the Async adapter the job will be executed
+ # asynchronously using {AsyncJob}[http://api.rubyonrails.org/classes/ActiveJob/AsyncJob.html].
+ #
+ # To use +AsyncJob+ set the queue_adapter config to +:async+.
+ #
+ # Rails.application.config.active_job.queue_adapter = :async
+ class AsyncAdapter
+ def enqueue(job) #:nodoc:
+ ActiveJob::AsyncJob.enqueue(job.serialize, queue: job.queue_name)
+ end
+
+ def enqueue_at(job, timestamp) #:nodoc:
+ ActiveJob::AsyncJob.enqueue_at(job.serialize, timestamp, queue: job.queue_name)
+ end
+ end
+ end
+end
diff --git a/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb b/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb
index ac83da2b9c..0a785fad3b 100644
--- a/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb
@@ -14,13 +14,13 @@ module ActiveJob
# Rails.application.config.active_job.queue_adapter = :delayed_job
class DelayedJobAdapter
def enqueue(job) #:nodoc:
- delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name)
+ delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, priority: job.priority)
job.provider_job_id = delayed_job.id
delayed_job
end
def enqueue_at(job, timestamp) #:nodoc:
- delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, run_at: Time.at(timestamp))
+ delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, priority: job.priority, run_at: Time.at(timestamp))
job.provider_job_id = delayed_job.id
delayed_job
end
diff --git a/activejob/lib/active_job/queue_adapters/que_adapter.rb b/activejob/lib/active_job/queue_adapters/que_adapter.rb
index 90947aa98d..ab13689747 100644
--- a/activejob/lib/active_job/queue_adapters/que_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/que_adapter.rb
@@ -16,13 +16,13 @@ module ActiveJob
# Rails.application.config.active_job.queue_adapter = :que
class QueAdapter
def enqueue(job) #:nodoc:
- que_job = JobWrapper.enqueue job.serialize
+ que_job = JobWrapper.enqueue job.serialize, priority: job.priority
job.provider_job_id = que_job.attrs["job_id"]
que_job
end
def enqueue_at(job, timestamp) #:nodoc:
- que_job = JobWrapper.enqueue job.serialize, run_at: Time.at(timestamp)
+ que_job = JobWrapper.enqueue job.serialize, priority: job.priority, run_at: Time.at(timestamp)
job.provider_job_id = que_job.attrs["job_id"]
que_job
end
diff --git a/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb b/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb
index 059754a87f..0ee41407d8 100644
--- a/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb
@@ -18,7 +18,9 @@ module ActiveJob
# Rails.application.config.active_job.queue_adapter = :queue_classic
class QueueClassicAdapter
def enqueue(job) #:nodoc:
- build_queue(job.queue_name).enqueue("#{JobWrapper.name}.perform", job.serialize)
+ qc_job = build_queue(job.queue_name).enqueue("#{JobWrapper.name}.perform", job.serialize)
+ job.provider_job_id = qc_job["id"] if qc_job.is_a?(Hash)
+ qc_job
end
def enqueue_at(job, timestamp) #:nodoc:
@@ -28,7 +30,9 @@ module ActiveJob
'the QC::Queue needs to respond to `enqueue_at(timestamp, method, *args)`. ' \
'You can implement this yourself or you can use the queue_classic-later gem.'
end
- queue.enqueue_at(timestamp, "#{JobWrapper.name}.perform", job.serialize)
+ qc_job = queue.enqueue_at(timestamp, "#{JobWrapper.name}.perform", job.serialize)
+ job.provider_job_id = qc_job["id"] if qc_job.is_a?(Hash)
+ qc_job
end
# Builds a <tt>QC::Queue</tt> object to schedule jobs on.
diff --git a/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb b/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb
index f102c6567e..d78bdecdcb 100644
--- a/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb
@@ -1,5 +1,5 @@
require 'sneakers'
-require 'thread'
+require 'monitor'
module ActiveJob
module QueueAdapters
diff --git a/activejob/lib/active_job/queue_name.rb b/activejob/lib/active_job/queue_name.rb
index 9ae0345120..65786a49ff 100644
--- a/activejob/lib/active_job/queue_name.rb
+++ b/activejob/lib/active_job/queue_name.rb
@@ -39,7 +39,7 @@ module ActiveJob
self.queue_name_delimiter = '_' # set default delimiter to '_'
end
- # Returns the name of the queue the job will be run on
+ # Returns the name of the queue the job will be run on.
def queue_name
if @queue_name.is_a?(Proc)
@queue_name = self.class.queue_name_from_part(instance_exec(&@queue_name))
diff --git a/activejob/lib/active_job/queue_priority.rb b/activejob/lib/active_job/queue_priority.rb
new file mode 100644
index 0000000000..01d84910ff
--- /dev/null
+++ b/activejob/lib/active_job/queue_priority.rb
@@ -0,0 +1,44 @@
+module ActiveJob
+ module QueuePriority
+ extend ActiveSupport::Concern
+
+ # Includes the ability to override the default queue priority.
+ module ClassMethods
+ mattr_accessor(:default_priority)
+
+ # Specifies the priority of the queue to create the job with.
+ #
+ # class PublishToFeedJob < ActiveJob::Base
+ # queue_with_priority 50
+ #
+ # def perform(post)
+ # post.to_feed!
+ # end
+ # end
+ #
+ # Specify either an argument or a block.
+ def queue_with_priority(priority=nil, &block)
+ if block_given?
+ self.priority = block
+ else
+ self.priority = priority
+ end
+ end
+ end
+
+ included do
+ class_attribute :priority, instance_accessor: false
+
+ self.priority = default_priority
+ end
+
+ # Returns the priority that the job will be created with
+ def priority
+ if @priority.is_a?(Proc)
+ @priority = instance_exec(&@priority)
+ end
+ @priority
+ end
+
+ end
+end
diff --git a/activejob/lib/active_job/test_helper.rb b/activejob/lib/active_job/test_helper.rb
index 9b307e8dc8..44ddfa5f69 100644
--- a/activejob/lib/active_job/test_helper.rb
+++ b/activejob/lib/active_job/test_helper.rb
@@ -7,7 +7,7 @@ module ActiveJob
extend ActiveSupport::Concern
included do
- def before_setup
+ def before_setup # :nodoc:
test_adapter = ActiveJob::QueueAdapters::TestAdapter.new
@old_queue_adapters = (ActiveJob::Base.subclasses << ActiveJob::Base).select do |klass|
@@ -24,7 +24,7 @@ module ActiveJob
super
end
- def after_teardown
+ def after_teardown # :nodoc:
super
@old_queue_adapters.each do |(klass, adapter)|
klass.queue_adapter = adapter
@@ -226,19 +226,22 @@ module ActiveJob
# assert_enqueued_with(job: MyJob, args: [1,2,3], queue: 'low') do
# MyJob.perform_later(1,2,3)
# end
+ #
+ # assert_enqueued_with(job: MyJob, at: Date.tomorrow.noon) do
+ # MyJob.set(wait_until: Date.tomorrow.noon).perform_later
+ # end
# end
- def assert_enqueued_with(args = {}, &_block)
- original_enqueued_jobs = enqueued_jobs.dup
- clear_enqueued_jobs
+ def assert_enqueued_with(args = {})
+ original_enqueued_jobs_count = enqueued_jobs.count
args.assert_valid_keys(:job, :args, :at, :queue)
serialized_args = serialize_args_for_assertion(args)
yield
- matching_job = enqueued_jobs.any? do |job|
+ in_block_jobs = enqueued_jobs.drop(original_enqueued_jobs_count)
+ matching_job = in_block_jobs.find do |job|
serialized_args.all? { |key, value| value == job[key] }
end
assert matching_job, "No enqueued job found with #{args}"
- ensure
- queue_adapter.enqueued_jobs = original_enqueued_jobs + enqueued_jobs
+ instantiate_job(matching_job)
end
# Asserts that the job passed in the block has been performed with the given arguments.
@@ -247,19 +250,22 @@ module ActiveJob
# assert_performed_with(job: MyJob, args: [1,2,3], queue: 'high') do
# MyJob.perform_later(1,2,3)
# end
+ #
+ # assert_performed_with(job: MyJob, at: Date.tomorrow.noon) do
+ # MyJob.set(wait_until: Date.tomorrow.noon).perform_later
+ # end
# end
- def assert_performed_with(args = {}, &_block)
- original_performed_jobs = performed_jobs.dup
- clear_performed_jobs
+ def assert_performed_with(args = {})
+ original_performed_jobs_count = performed_jobs.count
args.assert_valid_keys(:job, :args, :at, :queue)
serialized_args = serialize_args_for_assertion(args)
perform_enqueued_jobs { yield }
- matching_job = performed_jobs.any? do |job|
+ in_block_jobs = performed_jobs.drop(original_performed_jobs_count)
+ matching_job = in_block_jobs.find do |job|
serialized_args.all? { |key, value| value == job[key] }
end
assert matching_job, "No performed job found with #{args}"
- ensure
- queue_adapter.performed_jobs = original_performed_jobs + performed_jobs
+ instantiate_job(matching_job)
end
def perform_enqueued_jobs(only: nil)
@@ -288,28 +294,34 @@ module ActiveJob
to: :queue_adapter
private
- def clear_enqueued_jobs
+ def clear_enqueued_jobs # :nodoc:
enqueued_jobs.clear
end
- def clear_performed_jobs
+ def clear_performed_jobs # :nodoc:
performed_jobs.clear
end
- def enqueued_jobs_size(only: nil)
+ def enqueued_jobs_size(only: nil) # :nodoc:
if only
- enqueued_jobs.select { |job| job.fetch(:job) == only }.size
+ enqueued_jobs.count { |job| Array(only).include?(job.fetch(:job)) }
else
- enqueued_jobs.size
+ enqueued_jobs.count
end
end
- def serialize_args_for_assertion(args)
- serialized_args = args.dup
- if job_args = serialized_args.delete(:args)
- serialized_args[:args] = ActiveJob::Arguments.serialize(job_args)
+ def serialize_args_for_assertion(args) # :nodoc:
+ args.dup.tap do |serialized_args|
+ serialized_args[:args] = ActiveJob::Arguments.serialize(serialized_args[:args]) if serialized_args[:args]
+ serialized_args[:at] = serialized_args[:at].to_f if serialized_args[:at]
end
- serialized_args
+ end
+
+ def instantiate_job(payload) # :nodoc:
+ job = payload[:job].new(*payload[:args])
+ job.scheduled_at = Time.at(payload[:at]) if payload.key?(:at)
+ job.queue_name = payload[:queue]
+ job
end
end
end
diff --git a/activejob/lib/active_job/translation.rb b/activejob/lib/active_job/translation.rb
new file mode 100644
index 0000000000..67e4cf4ab9
--- /dev/null
+++ b/activejob/lib/active_job/translation.rb
@@ -0,0 +1,11 @@
+module ActiveJob
+ module Translation #:nodoc:
+ extend ActiveSupport::Concern
+
+ included do
+ around_perform do |job, block, _|
+ I18n.with_locale(job.locale, &block)
+ end
+ end
+ end
+end