From a66780bfff40ddaa1b2eed2f5f2e8b077558d761 Mon Sep 17 00:00:00 2001
From: Jeremy Daer <jeremydaer@gmail.com>
Date: Sun, 28 Feb 2016 11:45:16 -0700
Subject: Active Job: smaller footprint for the dev/test async adapter

Use one shared worker pool for all queues with 0-#CPU workers rather
than separate pools per queue with 2-10*#CPU workers each.
---
 activejob/CHANGELOG.md                             |   7 ++
 activejob/lib/active_job.rb                        |   1 -
 activejob/lib/active_job/async_job.rb              |  77 ---------------
 .../lib/active_job/queue_adapters/async_adapter.rb | 105 +++++++++++++++++++--
 activejob/test/adapters/async.rb                   |   4 +-
 activejob/test/cases/async_job_test.rb             |  42 ---------
 activejob/test/integration/queuing_test.rb         |   4 +-
 .../test/support/integration/adapters/async.rb     |   3 +-
 8 files changed, 110 insertions(+), 133 deletions(-)
 delete mode 100644 activejob/lib/active_job/async_job.rb
 delete mode 100644 activejob/test/cases/async_job_test.rb

(limited to 'activejob')

diff --git a/activejob/CHANGELOG.md b/activejob/CHANGELOG.md
index 913164fbc5..1e03c27535 100644
--- a/activejob/CHANGELOG.md
+++ b/activejob/CHANGELOG.md
@@ -1,3 +1,10 @@
+*   Tune the async adapter for low-footprint dev/test usage. Use a single
+    thread pool for all queues and limit to 0 to #CPU total threads, down from
+    2 to 10*#CPU per queue.
+
+    *Jeremy Daer*
+
+
 ## Rails 5.0.0.beta3 (February 24, 2016) ##
 
 *   Change the default adapter from inline to async. It's a better default as tests will then not mistakenly
diff --git a/activejob/lib/active_job.rb b/activejob/lib/active_job.rb
index f7e05f7cd3..4b9065cf50 100644
--- a/activejob/lib/active_job.rb
+++ b/activejob/lib/active_job.rb
@@ -32,7 +32,6 @@ module ActiveJob
   autoload :Base
   autoload :QueueAdapters
   autoload :ConfiguredJob
-  autoload :AsyncJob
   autoload :TestCase
   autoload :TestHelper
 end
diff --git a/activejob/lib/active_job/async_job.rb b/activejob/lib/active_job/async_job.rb
deleted file mode 100644
index 417ace21d2..0000000000
--- a/activejob/lib/active_job/async_job.rb
+++ /dev/null
@@ -1,77 +0,0 @@
-require 'concurrent/map'
-require 'concurrent/scheduled_task'
-require 'concurrent/executor/thread_pool_executor'
-require 'concurrent/utility/processor_counter'
-
-module ActiveJob
-  # == Active Job Async Job
-  #
-  # When enqueuing jobs with Async Job each job will be executed asynchronously
-  # on a +concurrent-ruby+ thread pool. All job data is retained in memory.
-  # Because job data is not saved to a persistent datastore there is no
-  # additional infrastructure needed and jobs process quickly. The lack of
-  # persistence, however, means that all unprocessed jobs will be lost on
-  # application restart. Therefore in-memory queue adapters are unsuitable for
-  # most production environments but are excellent for development and testing.
-  #
-  # Read more about Concurrent Ruby {here}[https://github.com/ruby-concurrency/concurrent-ruby].
-  #
-  # To use Async Job set the queue_adapter config to +:async+.
-  #
-  #   Rails.application.config.active_job.queue_adapter = :async
-  #
-  # Async Job supports job queues specified with +queue_as+. Queues are created
-  # automatically as needed and each has its own thread pool.
-  class AsyncJob
-
-    DEFAULT_EXECUTOR_OPTIONS = {
-      min_threads:     [2, Concurrent.processor_count].max,
-      max_threads:     Concurrent.processor_count * 10,
-      auto_terminate:  true,
-      idletime:        60, # 1 minute
-      max_queue:       0, # unlimited
-      fallback_policy: :caller_runs # shouldn't matter -- 0 max queue
-    }.freeze
-
-    QUEUES = Concurrent::Map.new do |hash, queue_name| #:nodoc:
-      hash.compute_if_absent(queue_name) { ActiveJob::AsyncJob.create_thread_pool }
-    end
-
-    class << self
-      # Forces jobs to process immediately when testing the Active Job gem.
-      # This should only be called from within unit tests.
-      def perform_immediately! #:nodoc:
-        @perform_immediately = true
-      end
-
-      # Allows jobs to run asynchronously when testing the Active Job gem.
-      # This should only be called from within unit tests.
-      def perform_asynchronously! #:nodoc:
-        @perform_immediately = false
-      end
-
-      def create_thread_pool #:nodoc:
-        if @perform_immediately
-          Concurrent::ImmediateExecutor.new
-        else
-          Concurrent::ThreadPoolExecutor.new(DEFAULT_EXECUTOR_OPTIONS)
-        end
-      end
-
-      def enqueue(job_data, queue: 'default') #:nodoc:
-        QUEUES[queue].post(job_data) { |job| ActiveJob::Base.execute(job) }
-      end
-
-      def enqueue_at(job_data, timestamp, queue: 'default') #:nodoc:
-        delay = timestamp - Time.current.to_f
-        if delay > 0
-          Concurrent::ScheduledTask.execute(delay, args: [job_data], executor: QUEUES[queue]) do |job|
-            ActiveJob::Base.execute(job)
-          end
-        else
-          enqueue(job_data, queue: queue)
-        end
-      end
-    end
-  end
-end
diff --git a/activejob/lib/active_job/queue_adapters/async_adapter.rb b/activejob/lib/active_job/queue_adapters/async_adapter.rb
index 3d3c749883..922bc4afce 100644
--- a/activejob/lib/active_job/queue_adapters/async_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/async_adapter.rb
@@ -1,22 +1,113 @@
-require 'active_job/async_job'
+require 'securerandom'
+require 'concurrent/scheduled_task'
+require 'concurrent/executor/thread_pool_executor'
+require 'concurrent/utility/processor_counter'
 
 module ActiveJob
   module QueueAdapters
     # == Active Job Async adapter
     #
-    # When enqueuing jobs with the Async adapter the job will be executed
-    # asynchronously using {AsyncJob}[http://api.rubyonrails.org/classes/ActiveJob/AsyncJob.html].
+    # The Async adapter runs jobs with an in-process thread pool.
     #
-    # To use +AsyncJob+ set the queue_adapter config to +:async+.
+    # This is the default queue adapter. It's well-suited for dev/test since
+    # it doesn't need an external infrastructure, but it's a poor fit for
+    # production since it drops pending jobs on restart.
     #
-    #   Rails.application.config.active_job.queue_adapter = :async
+    # To use this adapter, set queue adapter to +:async+:
+    #
+    #   config.active_job.queue_adapter = :async
+    #
+    # To configure the adapter's thread pool, instantiate the adapter and
+    # pass your own config:
+    #
+    #   config.active_job.queue_adapter = ActiveJob::QueueAdapters::AsyncAdapter.new \
+    #     min_threads: 1,
+    #     max_threads: 2 * Concurrent.processor_count,
+    #     idletime: 600.seconds
+    #
+    # The adapter uses a {Concurrent Ruby}[https://github.com/ruby-concurrency/concurrent-ruby] thread pool to schedule and execute
+    # jobs. Since jobs share a single thread pool, long-running jobs will block
+    # short-lived jobs. Fine for dev/test; bad for production.
     class AsyncAdapter
+      # See {Concurrent::ThreadPoolExecutor}[http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/ThreadPoolExecutor.html] for executor options.
+      def initialize(**executor_options)
+        @scheduler = Scheduler.new(**executor_options)
+      end
+
       def enqueue(job) #:nodoc:
-        ActiveJob::AsyncJob.enqueue(job.serialize, queue: job.queue_name)
+        @scheduler.enqueue JobWrapper.new(job), queue_name: job.queue_name
       end
 
       def enqueue_at(job, timestamp) #:nodoc:
-        ActiveJob::AsyncJob.enqueue_at(job.serialize, timestamp, queue: job.queue_name)
+        @scheduler.enqueue_at JobWrapper.new(job), timestamp, queue_name: job.queue_name
+      end
+
+      # Gracefully stop processing jobs. Finishes in-progress work and handles
+      # any new jobs following the executor's fallback policy (`caller_runs`).
+      # Waits for termination by default. Pass `wait: false` to continue.
+      def shutdown(wait: true) #:nodoc:
+        @scheduler.shutdown wait: wait
+      end
+
+      # Used for our test suite.
+      def immediate=(immediate) #:nodoc:
+        @scheduler.immediate = immediate
+      end
+
+      # Note that we don't actually need to serialize the jobs since we're
+      # performing them in-process, but we do so anyway for parity with other
+      # adapters and deployment environments. Otherwise, serialization bugs
+      # may creep in undetected.
+      class JobWrapper #:nodoc:
+        def initialize(job)
+          job.provider_job_id = SecureRandom.uuid
+          @job_data = job.serialize
+        end
+
+        def perform
+          Base.execute @job_data
+        end
+      end
+
+      class Scheduler #:nodoc:
+        DEFAULT_EXECUTOR_OPTIONS = {
+          min_threads:     0,
+          max_threads:     Concurrent.processor_count,
+          auto_terminate:  true,
+          idletime:        60, # 1 minute
+          max_queue:       0, # unlimited
+          fallback_policy: :caller_runs # shouldn't matter -- 0 max queue
+        }.freeze
+
+        attr_accessor :immediate
+
+        def initialize(**options)
+          self.immediate = false
+          @immediate_executor = Concurrent::ImmediateExecutor.new
+          @async_executor = Concurrent::ThreadPoolExecutor.new(DEFAULT_EXECUTOR_OPTIONS.merge(options))
+        end
+
+        def enqueue(job, queue_name:)
+          executor.post(job, &:perform)
+        end
+
+        def enqueue_at(job, timestamp, queue_name:)
+          delay = timestamp - Time.current.to_f
+          if delay > 0
+            Concurrent::ScheduledTask.execute(delay, args: [job], executor: executor, &:perform)
+          else
+            enqueue(job, queue_name: queue_name)
+          end
+        end
+
+        def shutdown(wait: true)
+          @async_executor.shutdown
+          @async_executor.wait_for_termination if wait
+        end
+
+        def executor
+          immediate ? @immediate_executor : @async_executor
+        end
       end
     end
   end
diff --git a/activejob/test/adapters/async.rb b/activejob/test/adapters/async.rb
index 5fcfb89566..08eb9658cd 100644
--- a/activejob/test/adapters/async.rb
+++ b/activejob/test/adapters/async.rb
@@ -1,4 +1,2 @@
-require 'active_job/async_job'
-
 ActiveJob::Base.queue_adapter = :async
-ActiveJob::AsyncJob.perform_immediately!
+ActiveJob::Base.queue_adapter.immediate = true
diff --git a/activejob/test/cases/async_job_test.rb b/activejob/test/cases/async_job_test.rb
deleted file mode 100644
index 2642cfc608..0000000000
--- a/activejob/test/cases/async_job_test.rb
+++ /dev/null
@@ -1,42 +0,0 @@
-require 'helper'
-require 'jobs/hello_job'
-require 'jobs/queue_as_job'
-
-class AsyncJobTest < ActiveSupport::TestCase
-  def using_async_adapter?
-    ActiveJob::Base.queue_adapter.is_a? ActiveJob::QueueAdapters::AsyncAdapter
-  end
-
-  setup do
-    ActiveJob::AsyncJob.perform_asynchronously!
-  end
-
-  teardown do
-    ActiveJob::AsyncJob::QUEUES.clear
-    ActiveJob::AsyncJob.perform_immediately!
-  end
-
-  test "#create_thread_pool returns a thread_pool" do
-    thread_pool = ActiveJob::AsyncJob.create_thread_pool
-    assert thread_pool.is_a? Concurrent::ExecutorService
-    assert_not thread_pool.is_a? Concurrent::ImmediateExecutor
-  end
-
-  test "#create_thread_pool returns an ImmediateExecutor after #perform_immediately! is called" do
-    ActiveJob::AsyncJob.perform_immediately!
-    thread_pool = ActiveJob::AsyncJob.create_thread_pool
-    assert thread_pool.is_a? Concurrent::ImmediateExecutor
-  end
-
-  test "enqueuing without specifying a queue uses the default queue" do
-    skip unless using_async_adapter?
-    HelloJob.perform_later
-    assert ActiveJob::AsyncJob::QUEUES.key? 'default'
-  end
-
-  test "enqueuing to a queue that does not exist creates the queue" do
-    skip unless using_async_adapter?
-    QueueAsJob.perform_later
-    assert ActiveJob::AsyncJob::QUEUES.key? QueueAsJob::MY_QUEUE.to_s
-  end
-end
diff --git a/activejob/test/integration/queuing_test.rb b/activejob/test/integration/queuing_test.rb
index d8425c9706..40f27500a5 100644
--- a/activejob/test/integration/queuing_test.rb
+++ b/activejob/test/integration/queuing_test.rb
@@ -57,13 +57,13 @@ class QueuingTest < ActiveSupport::TestCase
   end
 
   test 'should supply a provider_job_id when available for immediate jobs' do
-    skip unless adapter_is?(:delayed_job, :sidekiq, :qu, :que, :queue_classic)
+    skip unless adapter_is?(:async, :delayed_job, :sidekiq, :qu, :que, :queue_classic)
     test_job = TestJob.perform_later @id
     assert test_job.provider_job_id, 'Provider job id should be set by provider'
   end
 
   test 'should supply a provider_job_id when available for delayed jobs' do
-    skip unless adapter_is?(:delayed_job, :sidekiq, :que, :queue_classic)
+    skip unless adapter_is?(:async, :delayed_job, :sidekiq, :que, :queue_classic)
     delayed_test_job = TestJob.set(wait: 1.minute).perform_later @id
     assert delayed_test_job.provider_job_id, 'Provider job id should by set for delayed jobs by provider'
   end
diff --git a/activejob/test/support/integration/adapters/async.rb b/activejob/test/support/integration/adapters/async.rb
index 42beb12b1f..44ab98437a 100644
--- a/activejob/test/support/integration/adapters/async.rb
+++ b/activejob/test/support/integration/adapters/async.rb
@@ -1,9 +1,10 @@
 module AsyncJobsManager
   def setup
     ActiveJob::Base.queue_adapter = :async
+    ActiveJob::Base.queue_adapter.immediate = false
   end
 
   def clear_jobs
-    ActiveJob::AsyncJob::QUEUES.clear
+    ActiveJob::Base.queue_adapter.shutdown
   end
 end
-- 
cgit v1.2.3