diff options
Diffstat (limited to 'activejob/lib/active_job')
-rw-r--r-- | activejob/lib/active_job/base.rb | 2 | ||||
-rw-r--r-- | activejob/lib/active_job/core.rb | 8 | ||||
-rw-r--r-- | activejob/lib/active_job/enqueuing.rb | 4 | ||||
-rw-r--r-- | activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb | 4 | ||||
-rw-r--r-- | activejob/lib/active_job/queue_adapters/que_adapter.rb | 4 | ||||
-rw-r--r-- | activejob/lib/active_job/queue_priority.rb | 44 |
6 files changed, 62 insertions, 4 deletions
diff --git a/activejob/lib/active_job/base.rb b/activejob/lib/active_job/base.rb index 5d7c4cfb91..e5f09f65fb 100644 --- a/activejob/lib/active_job/base.rb +++ b/activejob/lib/active_job/base.rb @@ -1,6 +1,7 @@ require 'active_job/core' require 'active_job/queue_adapter' require 'active_job/queue_name' +require 'active_job/queue_priority' require 'active_job/enqueuing' require 'active_job/execution' require 'active_job/callbacks' @@ -57,6 +58,7 @@ module ActiveJob #:nodoc: include Core include QueueAdapter include QueueName + include QueuePriority include Enqueuing include Execution include Callbacks diff --git a/activejob/lib/active_job/core.rb b/activejob/lib/active_job/core.rb index eac7279309..19b900a285 100644 --- a/activejob/lib/active_job/core.rb +++ b/activejob/lib/active_job/core.rb @@ -18,6 +18,9 @@ module ActiveJob # Queue in which the job will reside. attr_writer :queue_name + # Priority that the job will have (lower is more priority). + attr_writer :priority + # ID optionally provided by adapter attr_accessor :provider_job_id @@ -43,6 +46,7 @@ module ActiveJob # * <tt>:wait</tt> - Enqueues the job with the specified delay # * <tt>:wait_until</tt> - Enqueues the job at the time specified # * <tt>:queue</tt> - Enqueues the job on the specified queue + # * <tt>:priority</tt> - Enqueues the job with the specified priority # # ==== Examples # @@ -51,6 +55,7 @@ module ActiveJob # VideoJob.set(wait_until: Time.now.tomorrow).perform_later(Video.last) # VideoJob.set(queue: :some_queue, wait: 5.minutes).perform_later(Video.last) # VideoJob.set(queue: :some_queue, wait_until: Time.now.tomorrow).perform_later(Video.last) + # VideoJob.set(queue: :some_queue, wait: 5.minutes, priority: 10).perform_later(Video.last) def set(options={}) ConfiguredJob.new(self, options) end @@ -62,6 +67,7 @@ module ActiveJob @arguments = arguments @job_id = SecureRandom.uuid @queue_name = self.class.queue_name + @priority = self.class.priority end # Returns a hash with the job data that can safely be passed to the @@ -71,6 +77,7 @@ module ActiveJob 'job_class' => self.class.name, 'job_id' => job_id, 'queue_name' => queue_name, + 'priority' => priority, 'arguments' => serialize_arguments(arguments), 'locale' => I18n.locale } @@ -99,6 +106,7 @@ module ActiveJob def deserialize(job_data) self.job_id = job_data['job_id'] self.queue_name = job_data['queue_name'] + self.priority = job_data['priority'] self.serialized_arguments = job_data['arguments'] self.locale = job_data['locale'] || I18n.locale end diff --git a/activejob/lib/active_job/enqueuing.rb b/activejob/lib/active_job/enqueuing.rb index 98d92385dd..22154457fd 100644 --- a/activejob/lib/active_job/enqueuing.rb +++ b/activejob/lib/active_job/enqueuing.rb @@ -32,6 +32,7 @@ module ActiveJob # * <tt>:wait</tt> - Enqueues the job with the specified delay # * <tt>:wait_until</tt> - Enqueues the job at the time specified # * <tt>:queue</tt> - Enqueues the job on the specified queue + # * <tt>:priority</tt> - Enqueues the job with the specified priority # # ==== Examples # @@ -54,6 +55,7 @@ module ActiveJob # * <tt>:wait</tt> - Enqueues the job with the specified delay # * <tt>:wait_until</tt> - Enqueues the job at the time specified # * <tt>:queue</tt> - Enqueues the job on the specified queue + # * <tt>:priority</tt> - Enqueues the job with the specified priority # # ==== Examples # @@ -61,10 +63,12 @@ module ActiveJob # my_job_instance.enqueue wait: 5.minutes # my_job_instance.enqueue queue: :important # my_job_instance.enqueue wait_until: Date.tomorrow.midnight + # my_job_instance.enqueue priority: 10 def enqueue(options={}) self.scheduled_at = options[:wait].seconds.from_now.to_f if options[:wait] self.scheduled_at = options[:wait_until].to_f if options[:wait_until] self.queue_name = self.class.queue_name_from_part(options[:queue]) if options[:queue] + self.priority = options[:priority].to_i if options[:priority] run_callbacks :enqueue do if self.scheduled_at self.class.queue_adapter.enqueue_at self, self.scheduled_at diff --git a/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb b/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb index ac83da2b9c..0a785fad3b 100644 --- a/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb @@ -14,13 +14,13 @@ module ActiveJob # Rails.application.config.active_job.queue_adapter = :delayed_job class DelayedJobAdapter def enqueue(job) #:nodoc: - delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name) + delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, priority: job.priority) job.provider_job_id = delayed_job.id delayed_job end def enqueue_at(job, timestamp) #:nodoc: - delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, run_at: Time.at(timestamp)) + delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, priority: job.priority, run_at: Time.at(timestamp)) job.provider_job_id = delayed_job.id delayed_job end diff --git a/activejob/lib/active_job/queue_adapters/que_adapter.rb b/activejob/lib/active_job/queue_adapters/que_adapter.rb index 90947aa98d..ab13689747 100644 --- a/activejob/lib/active_job/queue_adapters/que_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/que_adapter.rb @@ -16,13 +16,13 @@ module ActiveJob # Rails.application.config.active_job.queue_adapter = :que class QueAdapter def enqueue(job) #:nodoc: - que_job = JobWrapper.enqueue job.serialize + que_job = JobWrapper.enqueue job.serialize, priority: job.priority job.provider_job_id = que_job.attrs["job_id"] que_job end def enqueue_at(job, timestamp) #:nodoc: - que_job = JobWrapper.enqueue job.serialize, run_at: Time.at(timestamp) + que_job = JobWrapper.enqueue job.serialize, priority: job.priority, run_at: Time.at(timestamp) job.provider_job_id = que_job.attrs["job_id"] que_job end diff --git a/activejob/lib/active_job/queue_priority.rb b/activejob/lib/active_job/queue_priority.rb new file mode 100644 index 0000000000..01d84910ff --- /dev/null +++ b/activejob/lib/active_job/queue_priority.rb @@ -0,0 +1,44 @@ +module ActiveJob + module QueuePriority + extend ActiveSupport::Concern + + # Includes the ability to override the default queue priority. + module ClassMethods + mattr_accessor(:default_priority) + + # Specifies the priority of the queue to create the job with. + # + # class PublishToFeedJob < ActiveJob::Base + # queue_with_priority 50 + # + # def perform(post) + # post.to_feed! + # end + # end + # + # Specify either an argument or a block. + def queue_with_priority(priority=nil, &block) + if block_given? + self.priority = block + else + self.priority = priority + end + end + end + + included do + class_attribute :priority, instance_accessor: false + + self.priority = default_priority + end + + # Returns the priority that the job will be created with + def priority + if @priority.is_a?(Proc) + @priority = instance_exec(&@priority) + end + @priority + end + + end +end |