aboutsummaryrefslogtreecommitdiffstats
path: root/activejob
diff options
context:
space:
mode:
authorJeremy Daer <jeremydaer@gmail.com>2016-02-28 11:45:16 -0700
committerJeremy Daer <jeremydaer@gmail.com>2016-02-29 15:58:26 -0700
commita66780bfff40ddaa1b2eed2f5f2e8b077558d761 (patch)
treea1add8bcd059ecb06b01ca78c15f00f024a3a0ec /activejob
parentb04d07337fd7bc17e88500e9d6bcd361885a45f8 (diff)
downloadrails-a66780bfff40ddaa1b2eed2f5f2e8b077558d761.tar.gz
rails-a66780bfff40ddaa1b2eed2f5f2e8b077558d761.tar.bz2
rails-a66780bfff40ddaa1b2eed2f5f2e8b077558d761.zip
Active Job: smaller footprint for the dev/test async adapter
Use one shared worker pool for all queues with 0-#CPU workers rather than separate pools per queue with 2-10*#CPU workers each.
Diffstat (limited to 'activejob')
-rw-r--r--activejob/CHANGELOG.md7
-rw-r--r--activejob/lib/active_job.rb1
-rw-r--r--activejob/lib/active_job/async_job.rb77
-rw-r--r--activejob/lib/active_job/queue_adapters/async_adapter.rb105
-rw-r--r--activejob/test/adapters/async.rb4
-rw-r--r--activejob/test/cases/async_job_test.rb42
-rw-r--r--activejob/test/integration/queuing_test.rb4
-rw-r--r--activejob/test/support/integration/adapters/async.rb3
8 files changed, 110 insertions, 133 deletions
diff --git a/activejob/CHANGELOG.md b/activejob/CHANGELOG.md
index 913164fbc5..1e03c27535 100644
--- a/activejob/CHANGELOG.md
+++ b/activejob/CHANGELOG.md
@@ -1,3 +1,10 @@
+* Tune the async adapter for low-footprint dev/test usage. Use a single
+ thread pool for all queues and limit to 0 to #CPU total threads, down from
+ 2 to 10*#CPU per queue.
+
+ *Jeremy Daer*
+
+
## Rails 5.0.0.beta3 (February 24, 2016) ##
* Change the default adapter from inline to async. It's a better default as tests will then not mistakenly
diff --git a/activejob/lib/active_job.rb b/activejob/lib/active_job.rb
index f7e05f7cd3..4b9065cf50 100644
--- a/activejob/lib/active_job.rb
+++ b/activejob/lib/active_job.rb
@@ -32,7 +32,6 @@ module ActiveJob
autoload :Base
autoload :QueueAdapters
autoload :ConfiguredJob
- autoload :AsyncJob
autoload :TestCase
autoload :TestHelper
end
diff --git a/activejob/lib/active_job/async_job.rb b/activejob/lib/active_job/async_job.rb
deleted file mode 100644
index 417ace21d2..0000000000
--- a/activejob/lib/active_job/async_job.rb
+++ /dev/null
@@ -1,77 +0,0 @@
-require 'concurrent/map'
-require 'concurrent/scheduled_task'
-require 'concurrent/executor/thread_pool_executor'
-require 'concurrent/utility/processor_counter'
-
-module ActiveJob
- # == Active Job Async Job
- #
- # When enqueuing jobs with Async Job each job will be executed asynchronously
- # on a +concurrent-ruby+ thread pool. All job data is retained in memory.
- # Because job data is not saved to a persistent datastore there is no
- # additional infrastructure needed and jobs process quickly. The lack of
- # persistence, however, means that all unprocessed jobs will be lost on
- # application restart. Therefore in-memory queue adapters are unsuitable for
- # most production environments but are excellent for development and testing.
- #
- # Read more about Concurrent Ruby {here}[https://github.com/ruby-concurrency/concurrent-ruby].
- #
- # To use Async Job set the queue_adapter config to +:async+.
- #
- # Rails.application.config.active_job.queue_adapter = :async
- #
- # Async Job supports job queues specified with +queue_as+. Queues are created
- # automatically as needed and each has its own thread pool.
- class AsyncJob
-
- DEFAULT_EXECUTOR_OPTIONS = {
- min_threads: [2, Concurrent.processor_count].max,
- max_threads: Concurrent.processor_count * 10,
- auto_terminate: true,
- idletime: 60, # 1 minute
- max_queue: 0, # unlimited
- fallback_policy: :caller_runs # shouldn't matter -- 0 max queue
- }.freeze
-
- QUEUES = Concurrent::Map.new do |hash, queue_name| #:nodoc:
- hash.compute_if_absent(queue_name) { ActiveJob::AsyncJob.create_thread_pool }
- end
-
- class << self
- # Forces jobs to process immediately when testing the Active Job gem.
- # This should only be called from within unit tests.
- def perform_immediately! #:nodoc:
- @perform_immediately = true
- end
-
- # Allows jobs to run asynchronously when testing the Active Job gem.
- # This should only be called from within unit tests.
- def perform_asynchronously! #:nodoc:
- @perform_immediately = false
- end
-
- def create_thread_pool #:nodoc:
- if @perform_immediately
- Concurrent::ImmediateExecutor.new
- else
- Concurrent::ThreadPoolExecutor.new(DEFAULT_EXECUTOR_OPTIONS)
- end
- end
-
- def enqueue(job_data, queue: 'default') #:nodoc:
- QUEUES[queue].post(job_data) { |job| ActiveJob::Base.execute(job) }
- end
-
- def enqueue_at(job_data, timestamp, queue: 'default') #:nodoc:
- delay = timestamp - Time.current.to_f
- if delay > 0
- Concurrent::ScheduledTask.execute(delay, args: [job_data], executor: QUEUES[queue]) do |job|
- ActiveJob::Base.execute(job)
- end
- else
- enqueue(job_data, queue: queue)
- end
- end
- end
- end
-end
diff --git a/activejob/lib/active_job/queue_adapters/async_adapter.rb b/activejob/lib/active_job/queue_adapters/async_adapter.rb
index 3d3c749883..922bc4afce 100644
--- a/activejob/lib/active_job/queue_adapters/async_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/async_adapter.rb
@@ -1,22 +1,113 @@
-require 'active_job/async_job'
+require 'securerandom'
+require 'concurrent/scheduled_task'
+require 'concurrent/executor/thread_pool_executor'
+require 'concurrent/utility/processor_counter'
module ActiveJob
module QueueAdapters
# == Active Job Async adapter
#
- # When enqueuing jobs with the Async adapter the job will be executed
- # asynchronously using {AsyncJob}[http://api.rubyonrails.org/classes/ActiveJob/AsyncJob.html].
+ # The Async adapter runs jobs with an in-process thread pool.
#
- # To use +AsyncJob+ set the queue_adapter config to +:async+.
+ # This is the default queue adapter. It's well-suited for dev/test since
+ # it doesn't need an external infrastructure, but it's a poor fit for
+ # production since it drops pending jobs on restart.
#
- # Rails.application.config.active_job.queue_adapter = :async
+ # To use this adapter, set queue adapter to +:async+:
+ #
+ # config.active_job.queue_adapter = :async
+ #
+ # To configure the adapter's thread pool, instantiate the adapter and
+ # pass your own config:
+ #
+ # config.active_job.queue_adapter = ActiveJob::QueueAdapters::AsyncAdapter.new \
+ # min_threads: 1,
+ # max_threads: 2 * Concurrent.processor_count,
+ # idletime: 600.seconds
+ #
+ # The adapter uses a {Concurrent Ruby}[https://github.com/ruby-concurrency/concurrent-ruby] thread pool to schedule and execute
+ # jobs. Since jobs share a single thread pool, long-running jobs will block
+ # short-lived jobs. Fine for dev/test; bad for production.
class AsyncAdapter
+ # See {Concurrent::ThreadPoolExecutor}[http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/ThreadPoolExecutor.html] for executor options.
+ def initialize(**executor_options)
+ @scheduler = Scheduler.new(**executor_options)
+ end
+
def enqueue(job) #:nodoc:
- ActiveJob::AsyncJob.enqueue(job.serialize, queue: job.queue_name)
+ @scheduler.enqueue JobWrapper.new(job), queue_name: job.queue_name
end
def enqueue_at(job, timestamp) #:nodoc:
- ActiveJob::AsyncJob.enqueue_at(job.serialize, timestamp, queue: job.queue_name)
+ @scheduler.enqueue_at JobWrapper.new(job), timestamp, queue_name: job.queue_name
+ end
+
+ # Gracefully stop processing jobs. Finishes in-progress work and handles
+ # any new jobs following the executor's fallback policy (`caller_runs`).
+ # Waits for termination by default. Pass `wait: false` to continue.
+ def shutdown(wait: true) #:nodoc:
+ @scheduler.shutdown wait: wait
+ end
+
+ # Used for our test suite.
+ def immediate=(immediate) #:nodoc:
+ @scheduler.immediate = immediate
+ end
+
+ # Note that we don't actually need to serialize the jobs since we're
+ # performing them in-process, but we do so anyway for parity with other
+ # adapters and deployment environments. Otherwise, serialization bugs
+ # may creep in undetected.
+ class JobWrapper #:nodoc:
+ def initialize(job)
+ job.provider_job_id = SecureRandom.uuid
+ @job_data = job.serialize
+ end
+
+ def perform
+ Base.execute @job_data
+ end
+ end
+
+ class Scheduler #:nodoc:
+ DEFAULT_EXECUTOR_OPTIONS = {
+ min_threads: 0,
+ max_threads: Concurrent.processor_count,
+ auto_terminate: true,
+ idletime: 60, # 1 minute
+ max_queue: 0, # unlimited
+ fallback_policy: :caller_runs # shouldn't matter -- 0 max queue
+ }.freeze
+
+ attr_accessor :immediate
+
+ def initialize(**options)
+ self.immediate = false
+ @immediate_executor = Concurrent::ImmediateExecutor.new
+ @async_executor = Concurrent::ThreadPoolExecutor.new(DEFAULT_EXECUTOR_OPTIONS.merge(options))
+ end
+
+ def enqueue(job, queue_name:)
+ executor.post(job, &:perform)
+ end
+
+ def enqueue_at(job, timestamp, queue_name:)
+ delay = timestamp - Time.current.to_f
+ if delay > 0
+ Concurrent::ScheduledTask.execute(delay, args: [job], executor: executor, &:perform)
+ else
+ enqueue(job, queue_name: queue_name)
+ end
+ end
+
+ def shutdown(wait: true)
+ @async_executor.shutdown
+ @async_executor.wait_for_termination if wait
+ end
+
+ def executor
+ immediate ? @immediate_executor : @async_executor
+ end
end
end
end
diff --git a/activejob/test/adapters/async.rb b/activejob/test/adapters/async.rb
index 5fcfb89566..08eb9658cd 100644
--- a/activejob/test/adapters/async.rb
+++ b/activejob/test/adapters/async.rb
@@ -1,4 +1,2 @@
-require 'active_job/async_job'
-
ActiveJob::Base.queue_adapter = :async
-ActiveJob::AsyncJob.perform_immediately!
+ActiveJob::Base.queue_adapter.immediate = true
diff --git a/activejob/test/cases/async_job_test.rb b/activejob/test/cases/async_job_test.rb
deleted file mode 100644
index 2642cfc608..0000000000
--- a/activejob/test/cases/async_job_test.rb
+++ /dev/null
@@ -1,42 +0,0 @@
-require 'helper'
-require 'jobs/hello_job'
-require 'jobs/queue_as_job'
-
-class AsyncJobTest < ActiveSupport::TestCase
- def using_async_adapter?
- ActiveJob::Base.queue_adapter.is_a? ActiveJob::QueueAdapters::AsyncAdapter
- end
-
- setup do
- ActiveJob::AsyncJob.perform_asynchronously!
- end
-
- teardown do
- ActiveJob::AsyncJob::QUEUES.clear
- ActiveJob::AsyncJob.perform_immediately!
- end
-
- test "#create_thread_pool returns a thread_pool" do
- thread_pool = ActiveJob::AsyncJob.create_thread_pool
- assert thread_pool.is_a? Concurrent::ExecutorService
- assert_not thread_pool.is_a? Concurrent::ImmediateExecutor
- end
-
- test "#create_thread_pool returns an ImmediateExecutor after #perform_immediately! is called" do
- ActiveJob::AsyncJob.perform_immediately!
- thread_pool = ActiveJob::AsyncJob.create_thread_pool
- assert thread_pool.is_a? Concurrent::ImmediateExecutor
- end
-
- test "enqueuing without specifying a queue uses the default queue" do
- skip unless using_async_adapter?
- HelloJob.perform_later
- assert ActiveJob::AsyncJob::QUEUES.key? 'default'
- end
-
- test "enqueuing to a queue that does not exist creates the queue" do
- skip unless using_async_adapter?
- QueueAsJob.perform_later
- assert ActiveJob::AsyncJob::QUEUES.key? QueueAsJob::MY_QUEUE.to_s
- end
-end
diff --git a/activejob/test/integration/queuing_test.rb b/activejob/test/integration/queuing_test.rb
index d8425c9706..40f27500a5 100644
--- a/activejob/test/integration/queuing_test.rb
+++ b/activejob/test/integration/queuing_test.rb
@@ -57,13 +57,13 @@ class QueuingTest < ActiveSupport::TestCase
end
test 'should supply a provider_job_id when available for immediate jobs' do
- skip unless adapter_is?(:delayed_job, :sidekiq, :qu, :que, :queue_classic)
+ skip unless adapter_is?(:async, :delayed_job, :sidekiq, :qu, :que, :queue_classic)
test_job = TestJob.perform_later @id
assert test_job.provider_job_id, 'Provider job id should be set by provider'
end
test 'should supply a provider_job_id when available for delayed jobs' do
- skip unless adapter_is?(:delayed_job, :sidekiq, :que, :queue_classic)
+ skip unless adapter_is?(:async, :delayed_job, :sidekiq, :que, :queue_classic)
delayed_test_job = TestJob.set(wait: 1.minute).perform_later @id
assert delayed_test_job.provider_job_id, 'Provider job id should by set for delayed jobs by provider'
end
diff --git a/activejob/test/support/integration/adapters/async.rb b/activejob/test/support/integration/adapters/async.rb
index 42beb12b1f..44ab98437a 100644
--- a/activejob/test/support/integration/adapters/async.rb
+++ b/activejob/test/support/integration/adapters/async.rb
@@ -1,9 +1,10 @@
module AsyncJobsManager
def setup
ActiveJob::Base.queue_adapter = :async
+ ActiveJob::Base.queue_adapter.immediate = false
end
def clear_jobs
- ActiveJob::AsyncJob::QUEUES.clear
+ ActiveJob::Base.queue_adapter.shutdown
end
end