diff options
Diffstat (limited to 'activejob/lib')
-rw-r--r-- | activejob/lib/active_job.rb | 1 | ||||
-rw-r--r-- | activejob/lib/active_job/async_job.rb | 74 | ||||
-rw-r--r-- | activejob/lib/active_job/base.rb | 2 | ||||
-rw-r--r-- | activejob/lib/active_job/core.rb | 8 | ||||
-rw-r--r-- | activejob/lib/active_job/enqueuing.rb | 4 | ||||
-rw-r--r-- | activejob/lib/active_job/logging.rb | 18 | ||||
-rw-r--r-- | activejob/lib/active_job/queue_adapters.rb | 9 | ||||
-rw-r--r-- | activejob/lib/active_job/queue_adapters/async_adapter.rb | 23 | ||||
-rw-r--r-- | activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb | 4 | ||||
-rw-r--r-- | activejob/lib/active_job/queue_adapters/que_adapter.rb | 4 | ||||
-rw-r--r-- | activejob/lib/active_job/queue_adapters/sneakers_adapter.rb | 2 | ||||
-rw-r--r-- | activejob/lib/active_job/queue_priority.rb | 44 | ||||
-rw-r--r-- | activejob/lib/active_job/test_helper.rb | 49 |
13 files changed, 212 insertions, 30 deletions
diff --git a/activejob/lib/active_job.rb b/activejob/lib/active_job.rb index 3d4f63b261..eb8091a805 100644 --- a/activejob/lib/active_job.rb +++ b/activejob/lib/active_job.rb @@ -32,6 +32,7 @@ module ActiveJob autoload :Base autoload :QueueAdapters autoload :ConfiguredJob + autoload :AsyncJob autoload :TestCase autoload :TestHelper end diff --git a/activejob/lib/active_job/async_job.rb b/activejob/lib/active_job/async_job.rb new file mode 100644 index 0000000000..6c1c070994 --- /dev/null +++ b/activejob/lib/active_job/async_job.rb @@ -0,0 +1,74 @@ +require 'concurrent' + +module ActiveJob + # == Active Job Async Job + # + # When enqueueing jobs with Async Job each job will be executed asynchronously + # on a +concurrent-ruby+ thread pool. All job data is retained in memory. + # Because job data is not saved to a persistent datastore there is no + # additional infrastructure needed and jobs process quickly. The lack of + # persistence, however, means that all unprocessed jobs will be lost on + # application restart. Therefore in-memory queue adapters are unsuitable for + # most production environments but are excellent for development and testing. + # + # Read more about Concurrent Ruby {here}[https://github.com/ruby-concurrency/concurrent-ruby]. + # + # To use Async Job set the queue_adapter config to +:async+. + # + # Rails.application.config.active_job.queue_adapter = :async + # + # Async Job supports job queues specified with +queue_as+. Queues are created + # automatically as needed and each has its own thread pool. + class AsyncJob + + DEFAULT_EXECUTOR_OPTIONS = { + min_threads: [2, Concurrent.processor_count].max, + max_threads: Concurrent.processor_count * 10, + auto_terminate: true, + idletime: 60, # 1 minute + max_queue: 0, # unlimited + fallback_policy: :caller_runs # shouldn't matter -- 0 max queue + }.freeze + + QUEUES = Concurrent::Map.new do |hash, queue_name| #:nodoc: + hash.compute_if_absent(queue_name) { ActiveJob::AsyncJob.create_thread_pool } + end + + class << self + # Forces jobs to process immediately when testing the Active Job gem. + # This should only be called from within unit tests. + def perform_immediately! #:nodoc: + @perform_immediately = true + end + + # Allows jobs to run asynchronously when testing the Active Job gem. + # This should only be called from within unit tests. + def perform_asynchronously! #:nodoc: + @perform_immediately = false + end + + def create_thread_pool #:nodoc: + if @perform_immediately + Concurrent::ImmediateExecutor.new + else + Concurrent::ThreadPoolExecutor.new(DEFAULT_EXECUTOR_OPTIONS) + end + end + + def enqueue(job_data, queue: 'default') #:nodoc: + QUEUES[queue].post(job_data) { |job| ActiveJob::Base.execute(job) } + end + + def enqueue_at(job_data, timestamp, queue: 'default') #:nodoc: + delay = timestamp - Time.current.to_f + if delay > 0 + Concurrent::ScheduledTask.execute(delay, args: [job_data], executor: QUEUES[queue]) do |job| + ActiveJob::Base.execute(job) + end + else + enqueue(job_data, queue: queue) + end + end + end + end +end diff --git a/activejob/lib/active_job/base.rb b/activejob/lib/active_job/base.rb index 5d7c4cfb91..e5f09f65fb 100644 --- a/activejob/lib/active_job/base.rb +++ b/activejob/lib/active_job/base.rb @@ -1,6 +1,7 @@ require 'active_job/core' require 'active_job/queue_adapter' require 'active_job/queue_name' +require 'active_job/queue_priority' require 'active_job/enqueuing' require 'active_job/execution' require 'active_job/callbacks' @@ -57,6 +58,7 @@ module ActiveJob #:nodoc: include Core include QueueAdapter include QueueName + include QueuePriority include Enqueuing include Execution include Callbacks diff --git a/activejob/lib/active_job/core.rb b/activejob/lib/active_job/core.rb index eac7279309..19b900a285 100644 --- a/activejob/lib/active_job/core.rb +++ b/activejob/lib/active_job/core.rb @@ -18,6 +18,9 @@ module ActiveJob # Queue in which the job will reside. attr_writer :queue_name + # Priority that the job will have (lower is more priority). + attr_writer :priority + # ID optionally provided by adapter attr_accessor :provider_job_id @@ -43,6 +46,7 @@ module ActiveJob # * <tt>:wait</tt> - Enqueues the job with the specified delay # * <tt>:wait_until</tt> - Enqueues the job at the time specified # * <tt>:queue</tt> - Enqueues the job on the specified queue + # * <tt>:priority</tt> - Enqueues the job with the specified priority # # ==== Examples # @@ -51,6 +55,7 @@ module ActiveJob # VideoJob.set(wait_until: Time.now.tomorrow).perform_later(Video.last) # VideoJob.set(queue: :some_queue, wait: 5.minutes).perform_later(Video.last) # VideoJob.set(queue: :some_queue, wait_until: Time.now.tomorrow).perform_later(Video.last) + # VideoJob.set(queue: :some_queue, wait: 5.minutes, priority: 10).perform_later(Video.last) def set(options={}) ConfiguredJob.new(self, options) end @@ -62,6 +67,7 @@ module ActiveJob @arguments = arguments @job_id = SecureRandom.uuid @queue_name = self.class.queue_name + @priority = self.class.priority end # Returns a hash with the job data that can safely be passed to the @@ -71,6 +77,7 @@ module ActiveJob 'job_class' => self.class.name, 'job_id' => job_id, 'queue_name' => queue_name, + 'priority' => priority, 'arguments' => serialize_arguments(arguments), 'locale' => I18n.locale } @@ -99,6 +106,7 @@ module ActiveJob def deserialize(job_data) self.job_id = job_data['job_id'] self.queue_name = job_data['queue_name'] + self.priority = job_data['priority'] self.serialized_arguments = job_data['arguments'] self.locale = job_data['locale'] || I18n.locale end diff --git a/activejob/lib/active_job/enqueuing.rb b/activejob/lib/active_job/enqueuing.rb index 98d92385dd..22154457fd 100644 --- a/activejob/lib/active_job/enqueuing.rb +++ b/activejob/lib/active_job/enqueuing.rb @@ -32,6 +32,7 @@ module ActiveJob # * <tt>:wait</tt> - Enqueues the job with the specified delay # * <tt>:wait_until</tt> - Enqueues the job at the time specified # * <tt>:queue</tt> - Enqueues the job on the specified queue + # * <tt>:priority</tt> - Enqueues the job with the specified priority # # ==== Examples # @@ -54,6 +55,7 @@ module ActiveJob # * <tt>:wait</tt> - Enqueues the job with the specified delay # * <tt>:wait_until</tt> - Enqueues the job at the time specified # * <tt>:queue</tt> - Enqueues the job on the specified queue + # * <tt>:priority</tt> - Enqueues the job with the specified priority # # ==== Examples # @@ -61,10 +63,12 @@ module ActiveJob # my_job_instance.enqueue wait: 5.minutes # my_job_instance.enqueue queue: :important # my_job_instance.enqueue wait_until: Date.tomorrow.midnight + # my_job_instance.enqueue priority: 10 def enqueue(options={}) self.scheduled_at = options[:wait].seconds.from_now.to_f if options[:wait] self.scheduled_at = options[:wait_until].to_f if options[:wait_until] self.queue_name = self.class.queue_name_from_part(options[:queue]) if options[:queue] + self.priority = options[:priority].to_i if options[:priority] run_callbacks :enqueue do if self.scheduled_at self.class.queue_adapter.enqueue_at self, self.scheduled_at diff --git a/activejob/lib/active_job/logging.rb b/activejob/lib/active_job/logging.rb index 54774db601..605057d1e8 100644 --- a/activejob/lib/active_job/logging.rb +++ b/activejob/lib/active_job/logging.rb @@ -1,3 +1,4 @@ +require 'active_support/core_ext/hash/transform_values' require 'active_support/core_ext/string/filters' require 'active_support/tagged_logging' require 'active_support/logger' @@ -25,7 +26,7 @@ module ActiveJob end end - before_enqueue do |job| + after_enqueue do |job| if job.scheduled_at ActiveSupport::Notifications.instrument "enqueue_at.active_job", adapter: job.class.queue_adapter, job: job @@ -87,12 +88,25 @@ module ActiveJob def args_info(job) if job.arguments.any? ' with arguments: ' + - job.arguments.map { |arg| arg.try(:to_global_id).try(:to_s) || arg.inspect }.join(', ') + job.arguments.map { |arg| format(arg).inspect }.join(', ') else '' end end + def format(arg) + case arg + when Hash + arg.transform_values { |value| format(value) } + when Array + arg.map { |value| format(value) } + when GlobalID::Identification + arg.to_global_id rescue arg + else + arg + end + end + def scheduled_at(event) Time.at(event.payload[:job].scheduled_at).utc end diff --git a/activejob/lib/active_job/queue_adapters.rb b/activejob/lib/active_job/queue_adapters.rb index e8ceabaeba..aeb1fe1e73 100644 --- a/activejob/lib/active_job/queue_adapters.rb +++ b/activejob/lib/active_job/queue_adapters.rb @@ -12,6 +12,8 @@ module ActiveJob # * {Sidekiq}[http://sidekiq.org] # * {Sneakers}[https://github.com/jondot/sneakers] # * {Sucker Punch}[https://github.com/brandonhilkert/sucker_punch] + # * {Active Job Async Job}[http://api.rubyonrails.org/classes/ActiveJob/QueueAdapters/AsyncAdapter.html] + # * {Active Job Inline}[http://api.rubyonrails.org/classes/ActiveJob/QueueAdapters/InlineAdapter.html] # # === Backends Features # @@ -26,6 +28,7 @@ module ActiveJob # | Sidekiq | Yes | Yes | Yes | Queue | No | Job | # | Sneakers | Yes | Yes | No | Queue | Queue | No | # | Sucker Punch | Yes | Yes | No | No | No | No | + # | Active Job Async | Yes | Yes | Yes | No | No | No | # | Active Job Inline | No | Yes | N/A | N/A | N/A | N/A | # # ==== Async @@ -96,9 +99,15 @@ module ActiveJob # # N/A: The adapter does not run in a separate process, and therefore doesn't # support retries. + # + # === Async and Inline Queue Adapters + # + # Active Job has two built-in queue adapters intended for development and + # testing: +:async+ and +:inline+. module QueueAdapters extend ActiveSupport::Autoload + autoload :AsyncAdapter autoload :InlineAdapter autoload :BackburnerAdapter autoload :DelayedJobAdapter diff --git a/activejob/lib/active_job/queue_adapters/async_adapter.rb b/activejob/lib/active_job/queue_adapters/async_adapter.rb new file mode 100644 index 0000000000..3fc27f56e7 --- /dev/null +++ b/activejob/lib/active_job/queue_adapters/async_adapter.rb @@ -0,0 +1,23 @@ +require 'active_job/async_job' + +module ActiveJob + module QueueAdapters + # == Active Job Async adapter + # + # When enqueueing jobs with the Async adapter the job will be executed + # asynchronously using {AsyncJob}[http://api.rubyonrails.org/classes/ActiveJob/AsyncJob.html]. + # + # To use +AsyncJob+ set the queue_adapter config to +:async+. + # + # Rails.application.config.active_job.queue_adapter = :async + class AsyncAdapter + def enqueue(job) #:nodoc: + ActiveJob::AsyncJob.enqueue(job.serialize, queue: job.queue_name) + end + + def enqueue_at(job, timestamp) #:nodoc: + ActiveJob::AsyncJob.enqueue_at(job.serialize, timestamp, queue: job.queue_name) + end + end + end +end diff --git a/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb b/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb index ac83da2b9c..0a785fad3b 100644 --- a/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb @@ -14,13 +14,13 @@ module ActiveJob # Rails.application.config.active_job.queue_adapter = :delayed_job class DelayedJobAdapter def enqueue(job) #:nodoc: - delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name) + delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, priority: job.priority) job.provider_job_id = delayed_job.id delayed_job end def enqueue_at(job, timestamp) #:nodoc: - delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, run_at: Time.at(timestamp)) + delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, priority: job.priority, run_at: Time.at(timestamp)) job.provider_job_id = delayed_job.id delayed_job end diff --git a/activejob/lib/active_job/queue_adapters/que_adapter.rb b/activejob/lib/active_job/queue_adapters/que_adapter.rb index 90947aa98d..ab13689747 100644 --- a/activejob/lib/active_job/queue_adapters/que_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/que_adapter.rb @@ -16,13 +16,13 @@ module ActiveJob # Rails.application.config.active_job.queue_adapter = :que class QueAdapter def enqueue(job) #:nodoc: - que_job = JobWrapper.enqueue job.serialize + que_job = JobWrapper.enqueue job.serialize, priority: job.priority job.provider_job_id = que_job.attrs["job_id"] que_job end def enqueue_at(job, timestamp) #:nodoc: - que_job = JobWrapper.enqueue job.serialize, run_at: Time.at(timestamp) + que_job = JobWrapper.enqueue job.serialize, priority: job.priority, run_at: Time.at(timestamp) job.provider_job_id = que_job.attrs["job_id"] que_job end diff --git a/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb b/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb index f102c6567e..d78bdecdcb 100644 --- a/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb @@ -1,5 +1,5 @@ require 'sneakers' -require 'thread' +require 'monitor' module ActiveJob module QueueAdapters diff --git a/activejob/lib/active_job/queue_priority.rb b/activejob/lib/active_job/queue_priority.rb new file mode 100644 index 0000000000..01d84910ff --- /dev/null +++ b/activejob/lib/active_job/queue_priority.rb @@ -0,0 +1,44 @@ +module ActiveJob + module QueuePriority + extend ActiveSupport::Concern + + # Includes the ability to override the default queue priority. + module ClassMethods + mattr_accessor(:default_priority) + + # Specifies the priority of the queue to create the job with. + # + # class PublishToFeedJob < ActiveJob::Base + # queue_with_priority 50 + # + # def perform(post) + # post.to_feed! + # end + # end + # + # Specify either an argument or a block. + def queue_with_priority(priority=nil, &block) + if block_given? + self.priority = block + else + self.priority = priority + end + end + end + + included do + class_attribute :priority, instance_accessor: false + + self.priority = default_priority + end + + # Returns the priority that the job will be created with + def priority + if @priority.is_a?(Proc) + @priority = instance_exec(&@priority) + end + @priority + end + + end +end diff --git a/activejob/lib/active_job/test_helper.rb b/activejob/lib/active_job/test_helper.rb index 200d82838e..44ddfa5f69 100644 --- a/activejob/lib/active_job/test_helper.rb +++ b/activejob/lib/active_job/test_helper.rb @@ -226,20 +226,22 @@ module ActiveJob # assert_enqueued_with(job: MyJob, args: [1,2,3], queue: 'low') do # MyJob.perform_later(1,2,3) # end + # + # assert_enqueued_with(job: MyJob, at: Date.tomorrow.noon) do + # MyJob.set(wait_until: Date.tomorrow.noon).perform_later + # end # end - def assert_enqueued_with(args = {}, &_block) - original_enqueued_jobs = enqueued_jobs.dup - clear_enqueued_jobs + def assert_enqueued_with(args = {}) + original_enqueued_jobs_count = enqueued_jobs.count args.assert_valid_keys(:job, :args, :at, :queue) serialized_args = serialize_args_for_assertion(args) yield - matching_job = enqueued_jobs.find do |job| + in_block_jobs = enqueued_jobs.drop(original_enqueued_jobs_count) + matching_job = in_block_jobs.find do |job| serialized_args.all? { |key, value| value == job[key] } end assert matching_job, "No enqueued job found with #{args}" instantiate_job(matching_job) - ensure - queue_adapter.enqueued_jobs = original_enqueued_jobs + enqueued_jobs end # Asserts that the job passed in the block has been performed with the given arguments. @@ -248,20 +250,22 @@ module ActiveJob # assert_performed_with(job: MyJob, args: [1,2,3], queue: 'high') do # MyJob.perform_later(1,2,3) # end + # + # assert_performed_with(job: MyJob, at: Date.tomorrow.noon) do + # MyJob.set(wait_until: Date.tomorrow.noon).perform_later + # end # end - def assert_performed_with(args = {}, &_block) - original_performed_jobs = performed_jobs.dup - clear_performed_jobs + def assert_performed_with(args = {}) + original_performed_jobs_count = performed_jobs.count args.assert_valid_keys(:job, :args, :at, :queue) serialized_args = serialize_args_for_assertion(args) perform_enqueued_jobs { yield } - matching_job = performed_jobs.find do |job| + in_block_jobs = performed_jobs.drop(original_performed_jobs_count) + matching_job = in_block_jobs.find do |job| serialized_args.all? { |key, value| value == job[key] } end assert matching_job, "No performed job found with #{args}" instantiate_job(matching_job) - ensure - queue_adapter.performed_jobs = original_performed_jobs + performed_jobs end def perform_enqueued_jobs(only: nil) @@ -290,31 +294,30 @@ module ActiveJob to: :queue_adapter private - def clear_enqueued_jobs + def clear_enqueued_jobs # :nodoc: enqueued_jobs.clear end - def clear_performed_jobs + def clear_performed_jobs # :nodoc: performed_jobs.clear end - def enqueued_jobs_size(only: nil) + def enqueued_jobs_size(only: nil) # :nodoc: if only - enqueued_jobs.select { |job| job.fetch(:job) == only }.size + enqueued_jobs.count { |job| Array(only).include?(job.fetch(:job)) } else - enqueued_jobs.size + enqueued_jobs.count end end - def serialize_args_for_assertion(args) - serialized_args = args.dup - if job_args = serialized_args.delete(:args) - serialized_args[:args] = ActiveJob::Arguments.serialize(job_args) + def serialize_args_for_assertion(args) # :nodoc: + args.dup.tap do |serialized_args| + serialized_args[:args] = ActiveJob::Arguments.serialize(serialized_args[:args]) if serialized_args[:args] + serialized_args[:at] = serialized_args[:at].to_f if serialized_args[:at] end - serialized_args end - def instantiate_job(payload) + def instantiate_job(payload) # :nodoc: job = payload[:job].new(*payload[:args]) job.scheduled_at = Time.at(payload[:at]) if payload.key?(:at) job.queue_name = payload[:queue] |