diff options
author | Kasper Timm Hansen <kaspth@gmail.com> | 2015-08-25 21:37:27 +0200 |
---|---|---|
committer | Kasper Timm Hansen <kaspth@gmail.com> | 2015-08-25 21:37:27 +0200 |
commit | c5a88e50040fd8f7aa5b8d246fc5f8f1c77836fc (patch) | |
tree | 16416424798639dc5470e966c7af60ac8db27522 /activejob/lib | |
parent | 4788d7fd172e4bac76e1614aa48c58c8d4a09155 (diff) | |
parent | 25a4155257cd463355bccb4330b0280e4110de9e (diff) | |
download | rails-c5a88e50040fd8f7aa5b8d246fc5f8f1c77836fc.tar.gz rails-c5a88e50040fd8f7aa5b8d246fc5f8f1c77836fc.tar.bz2 rails-c5a88e50040fd8f7aa5b8d246fc5f8f1c77836fc.zip |
Merge pull request #21257 from jdantonio/async-job
Initial implementation of ActiveJob AsyncAdapter.
Diffstat (limited to 'activejob/lib')
-rw-r--r-- | activejob/lib/active_job.rb | 1 | ||||
-rw-r--r-- | activejob/lib/active_job/async_job.rb | 75 | ||||
-rw-r--r-- | activejob/lib/active_job/queue_adapters.rb | 9 | ||||
-rw-r--r-- | activejob/lib/active_job/queue_adapters/async_adapter.rb | 23 |
4 files changed, 108 insertions, 0 deletions
diff --git a/activejob/lib/active_job.rb b/activejob/lib/active_job.rb index 3d4f63b261..eb8091a805 100644 --- a/activejob/lib/active_job.rb +++ b/activejob/lib/active_job.rb @@ -32,6 +32,7 @@ module ActiveJob autoload :Base autoload :QueueAdapters autoload :ConfiguredJob + autoload :AsyncJob autoload :TestCase autoload :TestHelper end diff --git a/activejob/lib/active_job/async_job.rb b/activejob/lib/active_job/async_job.rb new file mode 100644 index 0000000000..7fcffc4c24 --- /dev/null +++ b/activejob/lib/active_job/async_job.rb @@ -0,0 +1,75 @@ +require 'concurrent' +require 'thread_safe' + +module ActiveJob + # == Active Job Async Job + # + # When enqueueing jobs with Async Job each job will be executed asynchronously + # on a +concurrent-ruby+ thread pool. All job data is retained in memory. + # Because job data is not saved to a persistent datastore there is no + # additional infrastructure needed and jobs process quickly. The lack of + # persistence, however, means that all unprocessed jobs will be lost on + # application restart. Therefore in-memory queue adapters are unsuitable for + # most production environments but are excellent for development and testing. + # + # Read more about Concurrent Ruby {here}[https://github.com/ruby-concurrency/concurrent-ruby]. + # + # To use Async Job set the queue_adapter config to +:async+. + # + # Rails.application.config.active_job.queue_adapter = :async + # + # Async Job supports job queues specified with +queue_as+. Queues are created + # automatically as needed and each has its own thread pool. + class AsyncJob + + DEFAULT_EXECUTOR_OPTIONS = { + min_threads: [2, Concurrent.processor_count].max, + max_threads: Concurrent.processor_count * 10, + auto_terminate: true, + idletime: 60, # 1 minute + max_queue: 0, # unlimited + fallback_policy: :caller_runs # shouldn't matter -- 0 max queue + }.freeze + + QUEUES = ThreadSafe::Cache.new do |hash, queue_name| #:nodoc: + hash.compute_if_absent(queue_name) { ActiveJob::AsyncJob.create_thread_pool } + end + + class << self + # Forces jobs to process immediately when testing the Active Job gem. + # This should only be called from within unit tests. + def perform_immediately! #:nodoc: + @perform_immediately = true + end + + # Allows jobs to run asynchronously when testing the Active Job gem. + # This should only be called from within unit tests. + def perform_asynchronously! #:nodoc: + @perform_immediately = false + end + + def create_thread_pool #:nodoc: + if @perform_immediately + Concurrent::ImmediateExecutor.new + else + Concurrent::ThreadPoolExecutor.new(DEFAULT_EXECUTOR_OPTIONS) + end + end + + def enqueue(job_data, queue: 'default') #:nodoc: + QUEUES[queue].post(job_data) { |job| ActiveJob::Base.execute(job) } + end + + def enqueue_at(job_data, timestamp, queue: 'default') #:nodoc: + delay = timestamp - Time.current.to_f + if delay > 0 + Concurrent::ScheduledTask.execute(delay, args: [job_data], executor: QUEUES[queue]) do |job| + ActiveJob::Base.execute(job) + end + else + enqueue(job_data, queue: queue) + end + end + end + end +end diff --git a/activejob/lib/active_job/queue_adapters.rb b/activejob/lib/active_job/queue_adapters.rb index e8ceabaeba..aeb1fe1e73 100644 --- a/activejob/lib/active_job/queue_adapters.rb +++ b/activejob/lib/active_job/queue_adapters.rb @@ -12,6 +12,8 @@ module ActiveJob # * {Sidekiq}[http://sidekiq.org] # * {Sneakers}[https://github.com/jondot/sneakers] # * {Sucker Punch}[https://github.com/brandonhilkert/sucker_punch] + # * {Active Job Async Job}[http://api.rubyonrails.org/classes/ActiveJob/QueueAdapters/AsyncAdapter.html] + # * {Active Job Inline}[http://api.rubyonrails.org/classes/ActiveJob/QueueAdapters/InlineAdapter.html] # # === Backends Features # @@ -26,6 +28,7 @@ module ActiveJob # | Sidekiq | Yes | Yes | Yes | Queue | No | Job | # | Sneakers | Yes | Yes | No | Queue | Queue | No | # | Sucker Punch | Yes | Yes | No | No | No | No | + # | Active Job Async | Yes | Yes | Yes | No | No | No | # | Active Job Inline | No | Yes | N/A | N/A | N/A | N/A | # # ==== Async @@ -96,9 +99,15 @@ module ActiveJob # # N/A: The adapter does not run in a separate process, and therefore doesn't # support retries. + # + # === Async and Inline Queue Adapters + # + # Active Job has two built-in queue adapters intended for development and + # testing: +:async+ and +:inline+. module QueueAdapters extend ActiveSupport::Autoload + autoload :AsyncAdapter autoload :InlineAdapter autoload :BackburnerAdapter autoload :DelayedJobAdapter diff --git a/activejob/lib/active_job/queue_adapters/async_adapter.rb b/activejob/lib/active_job/queue_adapters/async_adapter.rb new file mode 100644 index 0000000000..3fc27f56e7 --- /dev/null +++ b/activejob/lib/active_job/queue_adapters/async_adapter.rb @@ -0,0 +1,23 @@ +require 'active_job/async_job' + +module ActiveJob + module QueueAdapters + # == Active Job Async adapter + # + # When enqueueing jobs with the Async adapter the job will be executed + # asynchronously using {AsyncJob}[http://api.rubyonrails.org/classes/ActiveJob/AsyncJob.html]. + # + # To use +AsyncJob+ set the queue_adapter config to +:async+. + # + # Rails.application.config.active_job.queue_adapter = :async + class AsyncAdapter + def enqueue(job) #:nodoc: + ActiveJob::AsyncJob.enqueue(job.serialize, queue: job.queue_name) + end + + def enqueue_at(job, timestamp) #:nodoc: + ActiveJob::AsyncJob.enqueue_at(job.serialize, timestamp, queue: job.queue_name) + end + end + end +end |