diff options
author | Jerry D'Antonio <stumpjumper@gmail.com> | 2015-08-15 23:41:33 -0400 |
---|---|---|
committer | Jerry D'Antonio <stumpjumper@gmail.com> | 2015-08-25 14:22:11 -0400 |
commit | 25a4155257cd463355bccb4330b0280e4110de9e (patch) | |
tree | b58198818bb9ceadb4932e56f7c89c8b648ab4fb | |
parent | e6629257f4aecfa53fd1a1a2eae34aa33f637d79 (diff) | |
download | rails-25a4155257cd463355bccb4330b0280e4110de9e.tar.gz rails-25a4155257cd463355bccb4330b0280e4110de9e.tar.bz2 rails-25a4155257cd463355bccb4330b0280e4110de9e.zip |
Initial implementation of ActiveJob AsyncAdapter.
-rw-r--r-- | Gemfile.lock | 2 | ||||
-rw-r--r-- | activejob/CHANGELOG.md | 5 | ||||
-rw-r--r-- | activejob/Rakefile | 2 | ||||
-rw-r--r-- | activejob/lib/active_job.rb | 1 | ||||
-rw-r--r-- | activejob/lib/active_job/async_job.rb | 75 | ||||
-rw-r--r-- | activejob/lib/active_job/queue_adapters.rb | 9 | ||||
-rw-r--r-- | activejob/lib/active_job/queue_adapters/async_adapter.rb | 23 | ||||
-rw-r--r-- | activejob/test/adapters/async.rb | 5 | ||||
-rw-r--r-- | activejob/test/cases/async_job_test.rb | 42 | ||||
-rw-r--r-- | activejob/test/integration/queuing_test.rb | 2 | ||||
-rw-r--r-- | activejob/test/jobs/queue_as_job.rb | 10 | ||||
-rw-r--r-- | activejob/test/support/integration/adapters/async.rb | 9 | ||||
-rw-r--r-- | activesupport/activesupport.gemspec | 2 |
13 files changed, 183 insertions, 4 deletions
diff --git a/Gemfile.lock b/Gemfile.lock index 7cd4832b3a..4d9f39c3e7 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -118,7 +118,7 @@ PATH activesupport (= 5.0.0.alpha) arel (= 7.0.0.alpha) activesupport (5.0.0.alpha) - concurrent-ruby (~> 0.9.0) + concurrent-ruby (~> 0.9.1) i18n (~> 0.7) json (~> 1.7, >= 1.7.7) method_source diff --git a/activejob/CHANGELOG.md b/activejob/CHANGELOG.md index 637096e935..965b556fab 100644 --- a/activejob/CHANGELOG.md +++ b/activejob/CHANGELOG.md @@ -1,3 +1,8 @@ +* Implement a simple `AsyncJob` processor and associated `AsyncAdapter` that + queue jobs to a `concurrent-ruby` thread pool. + + *Jerry D'Antonio* + * Implement `provider_job_id` for `queue_classic` adapter. This requires the latest, currently unreleased, version of queue_classic. diff --git a/activejob/Rakefile b/activejob/Rakefile index 8c86df3c91..d9648a7f16 100644 --- a/activejob/Rakefile +++ b/activejob/Rakefile @@ -1,6 +1,6 @@ require 'rake/testtask' -ACTIVEJOB_ADAPTERS = %w(inline delayed_job qu que queue_classic resque sidekiq sneakers sucker_punch backburner test) +ACTIVEJOB_ADAPTERS = %w(async inline delayed_job qu que queue_classic resque sidekiq sneakers sucker_punch backburner test) ACTIVEJOB_ADAPTERS -= %w(queue_classic) if defined?(JRUBY_VERSION) task default: :test diff --git a/activejob/lib/active_job.rb b/activejob/lib/active_job.rb index 3d4f63b261..eb8091a805 100644 --- a/activejob/lib/active_job.rb +++ b/activejob/lib/active_job.rb @@ -32,6 +32,7 @@ module ActiveJob autoload :Base autoload :QueueAdapters autoload :ConfiguredJob + autoload :AsyncJob autoload :TestCase autoload :TestHelper end diff --git a/activejob/lib/active_job/async_job.rb b/activejob/lib/active_job/async_job.rb new file mode 100644 index 0000000000..7fcffc4c24 --- /dev/null +++ b/activejob/lib/active_job/async_job.rb @@ -0,0 +1,75 @@ +require 'concurrent' +require 'thread_safe' + +module ActiveJob + # == Active Job Async Job + # + # When enqueueing jobs with Async Job each job will be executed asynchronously + # on a +concurrent-ruby+ thread pool. All job data is retained in memory. + # Because job data is not saved to a persistent datastore there is no + # additional infrastructure needed and jobs process quickly. The lack of + # persistence, however, means that all unprocessed jobs will be lost on + # application restart. Therefore in-memory queue adapters are unsuitable for + # most production environments but are excellent for development and testing. + # + # Read more about Concurrent Ruby {here}[https://github.com/ruby-concurrency/concurrent-ruby]. + # + # To use Async Job set the queue_adapter config to +:async+. + # + # Rails.application.config.active_job.queue_adapter = :async + # + # Async Job supports job queues specified with +queue_as+. Queues are created + # automatically as needed and each has its own thread pool. + class AsyncJob + + DEFAULT_EXECUTOR_OPTIONS = { + min_threads: [2, Concurrent.processor_count].max, + max_threads: Concurrent.processor_count * 10, + auto_terminate: true, + idletime: 60, # 1 minute + max_queue: 0, # unlimited + fallback_policy: :caller_runs # shouldn't matter -- 0 max queue + }.freeze + + QUEUES = ThreadSafe::Cache.new do |hash, queue_name| #:nodoc: + hash.compute_if_absent(queue_name) { ActiveJob::AsyncJob.create_thread_pool } + end + + class << self + # Forces jobs to process immediately when testing the Active Job gem. + # This should only be called from within unit tests. + def perform_immediately! #:nodoc: + @perform_immediately = true + end + + # Allows jobs to run asynchronously when testing the Active Job gem. + # This should only be called from within unit tests. + def perform_asynchronously! #:nodoc: + @perform_immediately = false + end + + def create_thread_pool #:nodoc: + if @perform_immediately + Concurrent::ImmediateExecutor.new + else + Concurrent::ThreadPoolExecutor.new(DEFAULT_EXECUTOR_OPTIONS) + end + end + + def enqueue(job_data, queue: 'default') #:nodoc: + QUEUES[queue].post(job_data) { |job| ActiveJob::Base.execute(job) } + end + + def enqueue_at(job_data, timestamp, queue: 'default') #:nodoc: + delay = timestamp - Time.current.to_f + if delay > 0 + Concurrent::ScheduledTask.execute(delay, args: [job_data], executor: QUEUES[queue]) do |job| + ActiveJob::Base.execute(job) + end + else + enqueue(job_data, queue: queue) + end + end + end + end +end diff --git a/activejob/lib/active_job/queue_adapters.rb b/activejob/lib/active_job/queue_adapters.rb index e8ceabaeba..aeb1fe1e73 100644 --- a/activejob/lib/active_job/queue_adapters.rb +++ b/activejob/lib/active_job/queue_adapters.rb @@ -12,6 +12,8 @@ module ActiveJob # * {Sidekiq}[http://sidekiq.org] # * {Sneakers}[https://github.com/jondot/sneakers] # * {Sucker Punch}[https://github.com/brandonhilkert/sucker_punch] + # * {Active Job Async Job}[http://api.rubyonrails.org/classes/ActiveJob/QueueAdapters/AsyncAdapter.html] + # * {Active Job Inline}[http://api.rubyonrails.org/classes/ActiveJob/QueueAdapters/InlineAdapter.html] # # === Backends Features # @@ -26,6 +28,7 @@ module ActiveJob # | Sidekiq | Yes | Yes | Yes | Queue | No | Job | # | Sneakers | Yes | Yes | No | Queue | Queue | No | # | Sucker Punch | Yes | Yes | No | No | No | No | + # | Active Job Async | Yes | Yes | Yes | No | No | No | # | Active Job Inline | No | Yes | N/A | N/A | N/A | N/A | # # ==== Async @@ -96,9 +99,15 @@ module ActiveJob # # N/A: The adapter does not run in a separate process, and therefore doesn't # support retries. + # + # === Async and Inline Queue Adapters + # + # Active Job has two built-in queue adapters intended for development and + # testing: +:async+ and +:inline+. module QueueAdapters extend ActiveSupport::Autoload + autoload :AsyncAdapter autoload :InlineAdapter autoload :BackburnerAdapter autoload :DelayedJobAdapter diff --git a/activejob/lib/active_job/queue_adapters/async_adapter.rb b/activejob/lib/active_job/queue_adapters/async_adapter.rb new file mode 100644 index 0000000000..3fc27f56e7 --- /dev/null +++ b/activejob/lib/active_job/queue_adapters/async_adapter.rb @@ -0,0 +1,23 @@ +require 'active_job/async_job' + +module ActiveJob + module QueueAdapters + # == Active Job Async adapter + # + # When enqueueing jobs with the Async adapter the job will be executed + # asynchronously using {AsyncJob}[http://api.rubyonrails.org/classes/ActiveJob/AsyncJob.html]. + # + # To use +AsyncJob+ set the queue_adapter config to +:async+. + # + # Rails.application.config.active_job.queue_adapter = :async + class AsyncAdapter + def enqueue(job) #:nodoc: + ActiveJob::AsyncJob.enqueue(job.serialize, queue: job.queue_name) + end + + def enqueue_at(job, timestamp) #:nodoc: + ActiveJob::AsyncJob.enqueue_at(job.serialize, timestamp, queue: job.queue_name) + end + end + end +end diff --git a/activejob/test/adapters/async.rb b/activejob/test/adapters/async.rb new file mode 100644 index 0000000000..df58027599 --- /dev/null +++ b/activejob/test/adapters/async.rb @@ -0,0 +1,5 @@ +require 'concurrent' +require 'active_job/async_job' + +ActiveJob::Base.queue_adapter = :async +ActiveJob::AsyncJob.perform_immediately! diff --git a/activejob/test/cases/async_job_test.rb b/activejob/test/cases/async_job_test.rb new file mode 100644 index 0000000000..2642cfc608 --- /dev/null +++ b/activejob/test/cases/async_job_test.rb @@ -0,0 +1,42 @@ +require 'helper' +require 'jobs/hello_job' +require 'jobs/queue_as_job' + +class AsyncJobTest < ActiveSupport::TestCase + def using_async_adapter? + ActiveJob::Base.queue_adapter.is_a? ActiveJob::QueueAdapters::AsyncAdapter + end + + setup do + ActiveJob::AsyncJob.perform_asynchronously! + end + + teardown do + ActiveJob::AsyncJob::QUEUES.clear + ActiveJob::AsyncJob.perform_immediately! + end + + test "#create_thread_pool returns a thread_pool" do + thread_pool = ActiveJob::AsyncJob.create_thread_pool + assert thread_pool.is_a? Concurrent::ExecutorService + assert_not thread_pool.is_a? Concurrent::ImmediateExecutor + end + + test "#create_thread_pool returns an ImmediateExecutor after #perform_immediately! is called" do + ActiveJob::AsyncJob.perform_immediately! + thread_pool = ActiveJob::AsyncJob.create_thread_pool + assert thread_pool.is_a? Concurrent::ImmediateExecutor + end + + test "enqueuing without specifying a queue uses the default queue" do + skip unless using_async_adapter? + HelloJob.perform_later + assert ActiveJob::AsyncJob::QUEUES.key? 'default' + end + + test "enqueuing to a queue that does not exist creates the queue" do + skip unless using_async_adapter? + QueueAsJob.perform_later + assert ActiveJob::AsyncJob::QUEUES.key? QueueAsJob::MY_QUEUE.to_s + end +end diff --git a/activejob/test/integration/queuing_test.rb b/activejob/test/integration/queuing_test.rb index ca8047ef0b..125ba54302 100644 --- a/activejob/test/integration/queuing_test.rb +++ b/activejob/test/integration/queuing_test.rb @@ -11,7 +11,7 @@ class QueuingTest < ActiveSupport::TestCase end test 'should not run jobs queued on a non-listening queue' do - skip if adapter_is?(:inline, :sucker_punch, :que) + skip if adapter_is?(:inline, :async, :sucker_punch, :que) old_queue = TestJob.queue_name begin diff --git a/activejob/test/jobs/queue_as_job.rb b/activejob/test/jobs/queue_as_job.rb new file mode 100644 index 0000000000..897aef52e5 --- /dev/null +++ b/activejob/test/jobs/queue_as_job.rb @@ -0,0 +1,10 @@ +require_relative '../support/job_buffer' + +class QueueAsJob < ActiveJob::Base + MY_QUEUE = :low_priority + queue_as MY_QUEUE + + def perform(greeter = "David") + JobBuffer.add("#{greeter} says hello") + end +end diff --git a/activejob/test/support/integration/adapters/async.rb b/activejob/test/support/integration/adapters/async.rb new file mode 100644 index 0000000000..42beb12b1f --- /dev/null +++ b/activejob/test/support/integration/adapters/async.rb @@ -0,0 +1,9 @@ +module AsyncJobsManager + def setup + ActiveJob::Base.queue_adapter = :async + end + + def clear_jobs + ActiveJob::AsyncJob::QUEUES.clear + end +end diff --git a/activesupport/activesupport.gemspec b/activesupport/activesupport.gemspec index 6d2ae9be76..67b6663915 100644 --- a/activesupport/activesupport.gemspec +++ b/activesupport/activesupport.gemspec @@ -25,6 +25,6 @@ Gem::Specification.new do |s| s.add_dependency 'tzinfo', '~> 1.1' s.add_dependency 'minitest', '~> 5.1' s.add_dependency 'thread_safe','~> 0.3', '>= 0.3.4' - s.add_dependency 'concurrent-ruby', '~> 0.9.0' + s.add_dependency 'concurrent-ruby', '~> 0.9.1' s.add_dependency 'method_source' end |