aboutsummaryrefslogtreecommitdiffstats
path: root/activejob
diff options
context:
space:
mode:
authorwvengen <dev-rails@willem.engen.nl>2015-03-18 10:48:26 +0100
committerwvengen <dev-rails@willem.engen.nl>2015-09-17 22:17:39 +0200
commit7059ab35f797c163cd8907abcd7d0830b31e56f7 (patch)
tree708ccbf2da4412030b305f79c629a6961eb487c1 /activejob
parent61f9e47feac75a2cf4ed9e092bac036294564168 (diff)
downloadrails-7059ab35f797c163cd8907abcd7d0830b31e56f7.tar.gz
rails-7059ab35f797c163cd8907abcd7d0830b31e56f7.tar.bz2
rails-7059ab35f797c163cd8907abcd7d0830b31e56f7.zip
Add job priorities to ActiveJob
Diffstat (limited to 'activejob')
-rw-r--r--activejob/CHANGELOG.md4
-rw-r--r--activejob/lib/active_job/base.rb2
-rw-r--r--activejob/lib/active_job/core.rb8
-rw-r--r--activejob/lib/active_job/enqueuing.rb4
-rw-r--r--activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb4
-rw-r--r--activejob/lib/active_job/queue_adapters/que_adapter.rb4
-rw-r--r--activejob/lib/active_job/queue_priority.rb44
-rw-r--r--activejob/test/cases/queue_priority_test.rb47
-rw-r--r--activejob/test/integration/queuing_test.rb12
-rw-r--r--activejob/test/support/integration/test_case_helpers.rb8
-rw-r--r--activejob/test/support/que/inline.rb1
11 files changed, 132 insertions, 6 deletions
diff --git a/activejob/CHANGELOG.md b/activejob/CHANGELOG.md
index 965b556fab..b5742191eb 100644
--- a/activejob/CHANGELOG.md
+++ b/activejob/CHANGELOG.md
@@ -1,3 +1,7 @@
+* Add job priorities to Active Job.
+
+ *wvengen*
+
* Implement a simple `AsyncJob` processor and associated `AsyncAdapter` that
queue jobs to a `concurrent-ruby` thread pool.
diff --git a/activejob/lib/active_job/base.rb b/activejob/lib/active_job/base.rb
index 5d7c4cfb91..e5f09f65fb 100644
--- a/activejob/lib/active_job/base.rb
+++ b/activejob/lib/active_job/base.rb
@@ -1,6 +1,7 @@
require 'active_job/core'
require 'active_job/queue_adapter'
require 'active_job/queue_name'
+require 'active_job/queue_priority'
require 'active_job/enqueuing'
require 'active_job/execution'
require 'active_job/callbacks'
@@ -57,6 +58,7 @@ module ActiveJob #:nodoc:
include Core
include QueueAdapter
include QueueName
+ include QueuePriority
include Enqueuing
include Execution
include Callbacks
diff --git a/activejob/lib/active_job/core.rb b/activejob/lib/active_job/core.rb
index eac7279309..19b900a285 100644
--- a/activejob/lib/active_job/core.rb
+++ b/activejob/lib/active_job/core.rb
@@ -18,6 +18,9 @@ module ActiveJob
# Queue in which the job will reside.
attr_writer :queue_name
+ # Priority that the job will have (lower is more priority).
+ attr_writer :priority
+
# ID optionally provided by adapter
attr_accessor :provider_job_id
@@ -43,6 +46,7 @@ module ActiveJob
# * <tt>:wait</tt> - Enqueues the job with the specified delay
# * <tt>:wait_until</tt> - Enqueues the job at the time specified
# * <tt>:queue</tt> - Enqueues the job on the specified queue
+ # * <tt>:priority</tt> - Enqueues the job with the specified priority
#
# ==== Examples
#
@@ -51,6 +55,7 @@ module ActiveJob
# VideoJob.set(wait_until: Time.now.tomorrow).perform_later(Video.last)
# VideoJob.set(queue: :some_queue, wait: 5.minutes).perform_later(Video.last)
# VideoJob.set(queue: :some_queue, wait_until: Time.now.tomorrow).perform_later(Video.last)
+ # VideoJob.set(queue: :some_queue, wait: 5.minutes, priority: 10).perform_later(Video.last)
def set(options={})
ConfiguredJob.new(self, options)
end
@@ -62,6 +67,7 @@ module ActiveJob
@arguments = arguments
@job_id = SecureRandom.uuid
@queue_name = self.class.queue_name
+ @priority = self.class.priority
end
# Returns a hash with the job data that can safely be passed to the
@@ -71,6 +77,7 @@ module ActiveJob
'job_class' => self.class.name,
'job_id' => job_id,
'queue_name' => queue_name,
+ 'priority' => priority,
'arguments' => serialize_arguments(arguments),
'locale' => I18n.locale
}
@@ -99,6 +106,7 @@ module ActiveJob
def deserialize(job_data)
self.job_id = job_data['job_id']
self.queue_name = job_data['queue_name']
+ self.priority = job_data['priority']
self.serialized_arguments = job_data['arguments']
self.locale = job_data['locale'] || I18n.locale
end
diff --git a/activejob/lib/active_job/enqueuing.rb b/activejob/lib/active_job/enqueuing.rb
index 98d92385dd..22154457fd 100644
--- a/activejob/lib/active_job/enqueuing.rb
+++ b/activejob/lib/active_job/enqueuing.rb
@@ -32,6 +32,7 @@ module ActiveJob
# * <tt>:wait</tt> - Enqueues the job with the specified delay
# * <tt>:wait_until</tt> - Enqueues the job at the time specified
# * <tt>:queue</tt> - Enqueues the job on the specified queue
+ # * <tt>:priority</tt> - Enqueues the job with the specified priority
#
# ==== Examples
#
@@ -54,6 +55,7 @@ module ActiveJob
# * <tt>:wait</tt> - Enqueues the job with the specified delay
# * <tt>:wait_until</tt> - Enqueues the job at the time specified
# * <tt>:queue</tt> - Enqueues the job on the specified queue
+ # * <tt>:priority</tt> - Enqueues the job with the specified priority
#
# ==== Examples
#
@@ -61,10 +63,12 @@ module ActiveJob
# my_job_instance.enqueue wait: 5.minutes
# my_job_instance.enqueue queue: :important
# my_job_instance.enqueue wait_until: Date.tomorrow.midnight
+ # my_job_instance.enqueue priority: 10
def enqueue(options={})
self.scheduled_at = options[:wait].seconds.from_now.to_f if options[:wait]
self.scheduled_at = options[:wait_until].to_f if options[:wait_until]
self.queue_name = self.class.queue_name_from_part(options[:queue]) if options[:queue]
+ self.priority = options[:priority].to_i if options[:priority]
run_callbacks :enqueue do
if self.scheduled_at
self.class.queue_adapter.enqueue_at self, self.scheduled_at
diff --git a/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb b/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb
index ac83da2b9c..0a785fad3b 100644
--- a/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb
@@ -14,13 +14,13 @@ module ActiveJob
# Rails.application.config.active_job.queue_adapter = :delayed_job
class DelayedJobAdapter
def enqueue(job) #:nodoc:
- delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name)
+ delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, priority: job.priority)
job.provider_job_id = delayed_job.id
delayed_job
end
def enqueue_at(job, timestamp) #:nodoc:
- delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, run_at: Time.at(timestamp))
+ delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, priority: job.priority, run_at: Time.at(timestamp))
job.provider_job_id = delayed_job.id
delayed_job
end
diff --git a/activejob/lib/active_job/queue_adapters/que_adapter.rb b/activejob/lib/active_job/queue_adapters/que_adapter.rb
index 90947aa98d..ab13689747 100644
--- a/activejob/lib/active_job/queue_adapters/que_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/que_adapter.rb
@@ -16,13 +16,13 @@ module ActiveJob
# Rails.application.config.active_job.queue_adapter = :que
class QueAdapter
def enqueue(job) #:nodoc:
- que_job = JobWrapper.enqueue job.serialize
+ que_job = JobWrapper.enqueue job.serialize, priority: job.priority
job.provider_job_id = que_job.attrs["job_id"]
que_job
end
def enqueue_at(job, timestamp) #:nodoc:
- que_job = JobWrapper.enqueue job.serialize, run_at: Time.at(timestamp)
+ que_job = JobWrapper.enqueue job.serialize, priority: job.priority, run_at: Time.at(timestamp)
job.provider_job_id = que_job.attrs["job_id"]
que_job
end
diff --git a/activejob/lib/active_job/queue_priority.rb b/activejob/lib/active_job/queue_priority.rb
new file mode 100644
index 0000000000..01d84910ff
--- /dev/null
+++ b/activejob/lib/active_job/queue_priority.rb
@@ -0,0 +1,44 @@
+module ActiveJob
+ module QueuePriority
+ extend ActiveSupport::Concern
+
+ # Includes the ability to override the default queue priority.
+ module ClassMethods
+ mattr_accessor(:default_priority)
+
+ # Specifies the priority of the queue to create the job with.
+ #
+ # class PublishToFeedJob < ActiveJob::Base
+ # queue_with_priority 50
+ #
+ # def perform(post)
+ # post.to_feed!
+ # end
+ # end
+ #
+ # Specify either an argument or a block.
+ def queue_with_priority(priority=nil, &block)
+ if block_given?
+ self.priority = block
+ else
+ self.priority = priority
+ end
+ end
+ end
+
+ included do
+ class_attribute :priority, instance_accessor: false
+
+ self.priority = default_priority
+ end
+
+ # Returns the priority that the job will be created with
+ def priority
+ if @priority.is_a?(Proc)
+ @priority = instance_exec(&@priority)
+ end
+ @priority
+ end
+
+ end
+end
diff --git a/activejob/test/cases/queue_priority_test.rb b/activejob/test/cases/queue_priority_test.rb
new file mode 100644
index 0000000000..ca17b51dad
--- /dev/null
+++ b/activejob/test/cases/queue_priority_test.rb
@@ -0,0 +1,47 @@
+require 'helper'
+require 'jobs/hello_job'
+
+class QueuePriorityTest < ActiveSupport::TestCase
+ test 'priority unset by default' do
+ assert_equal nil, HelloJob.priority
+ end
+
+ test 'uses given priority' do
+ original_priority = HelloJob.priority
+
+ begin
+ HelloJob.queue_with_priority 90
+ assert_equal 90, HelloJob.new.priority
+ ensure
+ HelloJob.priority = original_priority
+ end
+ end
+
+ test 'evals block given to priority to determine priority' do
+ original_priority = HelloJob.priority
+
+ begin
+ HelloJob.queue_with_priority { 25 }
+ assert_equal 25, HelloJob.new.priority
+ ensure
+ HelloJob.priority = original_priority
+ end
+ end
+
+ test 'can use arguments to determine priority in priority block' do
+ original_priority = HelloJob.priority
+
+ begin
+ HelloJob.queue_with_priority { self.arguments.first=='1' ? 99 : 11 }
+ assert_equal 99, HelloJob.new('1').priority
+ assert_equal 11, HelloJob.new('3').priority
+ ensure
+ HelloJob.priority = original_priority
+ end
+ end
+
+ test 'uses priority passed to #set' do
+ job = HelloJob.set(priority: 123).perform_later
+ assert_equal 123, job.priority
+ end
+end
diff --git a/activejob/test/integration/queuing_test.rb b/activejob/test/integration/queuing_test.rb
index 125ba54302..e435ed4aa6 100644
--- a/activejob/test/integration/queuing_test.rb
+++ b/activejob/test/integration/queuing_test.rb
@@ -84,4 +84,16 @@ class QueuingTest < ActiveSupport::TestCase
I18n.locale = :en
end
end
+
+ test 'should run job with higher priority first' do
+ skip unless adapter_is?(:delayed_job, :que)
+
+ wait_until = Time.now + 3.seconds
+ TestJob.set(wait_until: wait_until, priority: 20).perform_later "#{@id}.1"
+ TestJob.set(wait_until: wait_until, priority: 10).perform_later "#{@id}.2"
+ wait_for_jobs_to_finish_for(10.seconds)
+ assert job_executed "#{@id}.1"
+ assert job_executed "#{@id}.2"
+ assert job_executed_at("#{@id}.2") < job_executed_at("#{@id}.1")
+ end
end
diff --git a/activejob/test/support/integration/test_case_helpers.rb b/activejob/test/support/integration/test_case_helpers.rb
index 39aee6d407..8319d09520 100644
--- a/activejob/test/support/integration/test_case_helpers.rb
+++ b/activejob/test/support/integration/test_case_helpers.rb
@@ -42,8 +42,12 @@ module TestCaseHelpers
end
end
- def job_executed
- Dummy::Application.root.join("tmp/#{@id}").exist?
+ def job_executed(id=@id)
+ Dummy::Application.root.join("tmp/#{id}").exist?
+ end
+
+ def job_executed_at(id=@id)
+ File.new(Dummy::Application.root.join("tmp/#{id}")).ctime
end
def job_output
diff --git a/activejob/test/support/que/inline.rb b/activejob/test/support/que/inline.rb
index 0232da1370..0950e52d28 100644
--- a/activejob/test/support/que/inline.rb
+++ b/activejob/test/support/que/inline.rb
@@ -6,6 +6,7 @@ Que::Job.class_eval do
if args.last.is_a?(Hash)
options = args.pop
options.delete(:run_at)
+ options.delete(:priority)
args << options unless options.empty?
end
self.run(*args)