aboutsummaryrefslogtreecommitdiffstats
path: root/activejob
diff options
context:
space:
mode:
Diffstat (limited to 'activejob')
-rw-r--r--activejob/CHANGELOG.md5
-rw-r--r--activejob/Rakefile2
-rw-r--r--activejob/lib/active_job.rb1
-rw-r--r--activejob/lib/active_job/async_job.rb75
-rw-r--r--activejob/lib/active_job/queue_adapters.rb9
-rw-r--r--activejob/lib/active_job/queue_adapters/async_adapter.rb23
-rw-r--r--activejob/test/adapters/async.rb5
-rw-r--r--activejob/test/cases/async_job_test.rb42
-rw-r--r--activejob/test/helper.rb1
-rw-r--r--activejob/test/integration/queuing_test.rb2
-rw-r--r--activejob/test/jobs/queue_as_job.rb10
-rw-r--r--activejob/test/support/integration/adapters/async.rb9
12 files changed, 182 insertions, 2 deletions
diff --git a/activejob/CHANGELOG.md b/activejob/CHANGELOG.md
index 637096e935..965b556fab 100644
--- a/activejob/CHANGELOG.md
+++ b/activejob/CHANGELOG.md
@@ -1,3 +1,8 @@
+* Implement a simple `AsyncJob` processor and associated `AsyncAdapter` that
+ queue jobs to a `concurrent-ruby` thread pool.
+
+ *Jerry D'Antonio*
+
* Implement `provider_job_id` for `queue_classic` adapter. This requires the
latest, currently unreleased, version of queue_classic.
diff --git a/activejob/Rakefile b/activejob/Rakefile
index 8c86df3c91..d9648a7f16 100644
--- a/activejob/Rakefile
+++ b/activejob/Rakefile
@@ -1,6 +1,6 @@
require 'rake/testtask'
-ACTIVEJOB_ADAPTERS = %w(inline delayed_job qu que queue_classic resque sidekiq sneakers sucker_punch backburner test)
+ACTIVEJOB_ADAPTERS = %w(async inline delayed_job qu que queue_classic resque sidekiq sneakers sucker_punch backburner test)
ACTIVEJOB_ADAPTERS -= %w(queue_classic) if defined?(JRUBY_VERSION)
task default: :test
diff --git a/activejob/lib/active_job.rb b/activejob/lib/active_job.rb
index 3d4f63b261..eb8091a805 100644
--- a/activejob/lib/active_job.rb
+++ b/activejob/lib/active_job.rb
@@ -32,6 +32,7 @@ module ActiveJob
autoload :Base
autoload :QueueAdapters
autoload :ConfiguredJob
+ autoload :AsyncJob
autoload :TestCase
autoload :TestHelper
end
diff --git a/activejob/lib/active_job/async_job.rb b/activejob/lib/active_job/async_job.rb
new file mode 100644
index 0000000000..7fcffc4c24
--- /dev/null
+++ b/activejob/lib/active_job/async_job.rb
@@ -0,0 +1,75 @@
+require 'concurrent'
+require 'thread_safe'
+
+module ActiveJob
+ # == Active Job Async Job
+ #
+ # When enqueueing jobs with Async Job each job will be executed asynchronously
+ # on a +concurrent-ruby+ thread pool. All job data is retained in memory.
+ # Because job data is not saved to a persistent datastore there is no
+ # additional infrastructure needed and jobs process quickly. The lack of
+ # persistence, however, means that all unprocessed jobs will be lost on
+ # application restart. Therefore in-memory queue adapters are unsuitable for
+ # most production environments but are excellent for development and testing.
+ #
+ # Read more about Concurrent Ruby {here}[https://github.com/ruby-concurrency/concurrent-ruby].
+ #
+ # To use Async Job set the queue_adapter config to +:async+.
+ #
+ # Rails.application.config.active_job.queue_adapter = :async
+ #
+ # Async Job supports job queues specified with +queue_as+. Queues are created
+ # automatically as needed and each has its own thread pool.
+ class AsyncJob
+
+ DEFAULT_EXECUTOR_OPTIONS = {
+ min_threads: [2, Concurrent.processor_count].max,
+ max_threads: Concurrent.processor_count * 10,
+ auto_terminate: true,
+ idletime: 60, # 1 minute
+ max_queue: 0, # unlimited
+ fallback_policy: :caller_runs # shouldn't matter -- 0 max queue
+ }.freeze
+
+ QUEUES = ThreadSafe::Cache.new do |hash, queue_name| #:nodoc:
+ hash.compute_if_absent(queue_name) { ActiveJob::AsyncJob.create_thread_pool }
+ end
+
+ class << self
+ # Forces jobs to process immediately when testing the Active Job gem.
+ # This should only be called from within unit tests.
+ def perform_immediately! #:nodoc:
+ @perform_immediately = true
+ end
+
+ # Allows jobs to run asynchronously when testing the Active Job gem.
+ # This should only be called from within unit tests.
+ def perform_asynchronously! #:nodoc:
+ @perform_immediately = false
+ end
+
+ def create_thread_pool #:nodoc:
+ if @perform_immediately
+ Concurrent::ImmediateExecutor.new
+ else
+ Concurrent::ThreadPoolExecutor.new(DEFAULT_EXECUTOR_OPTIONS)
+ end
+ end
+
+ def enqueue(job_data, queue: 'default') #:nodoc:
+ QUEUES[queue].post(job_data) { |job| ActiveJob::Base.execute(job) }
+ end
+
+ def enqueue_at(job_data, timestamp, queue: 'default') #:nodoc:
+ delay = timestamp - Time.current.to_f
+ if delay > 0
+ Concurrent::ScheduledTask.execute(delay, args: [job_data], executor: QUEUES[queue]) do |job|
+ ActiveJob::Base.execute(job)
+ end
+ else
+ enqueue(job_data, queue: queue)
+ end
+ end
+ end
+ end
+end
diff --git a/activejob/lib/active_job/queue_adapters.rb b/activejob/lib/active_job/queue_adapters.rb
index e8ceabaeba..aeb1fe1e73 100644
--- a/activejob/lib/active_job/queue_adapters.rb
+++ b/activejob/lib/active_job/queue_adapters.rb
@@ -12,6 +12,8 @@ module ActiveJob
# * {Sidekiq}[http://sidekiq.org]
# * {Sneakers}[https://github.com/jondot/sneakers]
# * {Sucker Punch}[https://github.com/brandonhilkert/sucker_punch]
+ # * {Active Job Async Job}[http://api.rubyonrails.org/classes/ActiveJob/QueueAdapters/AsyncAdapter.html]
+ # * {Active Job Inline}[http://api.rubyonrails.org/classes/ActiveJob/QueueAdapters/InlineAdapter.html]
#
# === Backends Features
#
@@ -26,6 +28,7 @@ module ActiveJob
# | Sidekiq | Yes | Yes | Yes | Queue | No | Job |
# | Sneakers | Yes | Yes | No | Queue | Queue | No |
# | Sucker Punch | Yes | Yes | No | No | No | No |
+ # | Active Job Async | Yes | Yes | Yes | No | No | No |
# | Active Job Inline | No | Yes | N/A | N/A | N/A | N/A |
#
# ==== Async
@@ -96,9 +99,15 @@ module ActiveJob
#
# N/A: The adapter does not run in a separate process, and therefore doesn't
# support retries.
+ #
+ # === Async and Inline Queue Adapters
+ #
+ # Active Job has two built-in queue adapters intended for development and
+ # testing: +:async+ and +:inline+.
module QueueAdapters
extend ActiveSupport::Autoload
+ autoload :AsyncAdapter
autoload :InlineAdapter
autoload :BackburnerAdapter
autoload :DelayedJobAdapter
diff --git a/activejob/lib/active_job/queue_adapters/async_adapter.rb b/activejob/lib/active_job/queue_adapters/async_adapter.rb
new file mode 100644
index 0000000000..3fc27f56e7
--- /dev/null
+++ b/activejob/lib/active_job/queue_adapters/async_adapter.rb
@@ -0,0 +1,23 @@
+require 'active_job/async_job'
+
+module ActiveJob
+ module QueueAdapters
+ # == Active Job Async adapter
+ #
+ # When enqueueing jobs with the Async adapter the job will be executed
+ # asynchronously using {AsyncJob}[http://api.rubyonrails.org/classes/ActiveJob/AsyncJob.html].
+ #
+ # To use +AsyncJob+ set the queue_adapter config to +:async+.
+ #
+ # Rails.application.config.active_job.queue_adapter = :async
+ class AsyncAdapter
+ def enqueue(job) #:nodoc:
+ ActiveJob::AsyncJob.enqueue(job.serialize, queue: job.queue_name)
+ end
+
+ def enqueue_at(job, timestamp) #:nodoc:
+ ActiveJob::AsyncJob.enqueue_at(job.serialize, timestamp, queue: job.queue_name)
+ end
+ end
+ end
+end
diff --git a/activejob/test/adapters/async.rb b/activejob/test/adapters/async.rb
new file mode 100644
index 0000000000..df58027599
--- /dev/null
+++ b/activejob/test/adapters/async.rb
@@ -0,0 +1,5 @@
+require 'concurrent'
+require 'active_job/async_job'
+
+ActiveJob::Base.queue_adapter = :async
+ActiveJob::AsyncJob.perform_immediately!
diff --git a/activejob/test/cases/async_job_test.rb b/activejob/test/cases/async_job_test.rb
new file mode 100644
index 0000000000..2642cfc608
--- /dev/null
+++ b/activejob/test/cases/async_job_test.rb
@@ -0,0 +1,42 @@
+require 'helper'
+require 'jobs/hello_job'
+require 'jobs/queue_as_job'
+
+class AsyncJobTest < ActiveSupport::TestCase
+ def using_async_adapter?
+ ActiveJob::Base.queue_adapter.is_a? ActiveJob::QueueAdapters::AsyncAdapter
+ end
+
+ setup do
+ ActiveJob::AsyncJob.perform_asynchronously!
+ end
+
+ teardown do
+ ActiveJob::AsyncJob::QUEUES.clear
+ ActiveJob::AsyncJob.perform_immediately!
+ end
+
+ test "#create_thread_pool returns a thread_pool" do
+ thread_pool = ActiveJob::AsyncJob.create_thread_pool
+ assert thread_pool.is_a? Concurrent::ExecutorService
+ assert_not thread_pool.is_a? Concurrent::ImmediateExecutor
+ end
+
+ test "#create_thread_pool returns an ImmediateExecutor after #perform_immediately! is called" do
+ ActiveJob::AsyncJob.perform_immediately!
+ thread_pool = ActiveJob::AsyncJob.create_thread_pool
+ assert thread_pool.is_a? Concurrent::ImmediateExecutor
+ end
+
+ test "enqueuing without specifying a queue uses the default queue" do
+ skip unless using_async_adapter?
+ HelloJob.perform_later
+ assert ActiveJob::AsyncJob::QUEUES.key? 'default'
+ end
+
+ test "enqueuing to a queue that does not exist creates the queue" do
+ skip unless using_async_adapter?
+ QueueAsJob.perform_later
+ assert ActiveJob::AsyncJob::QUEUES.key? QueueAsJob::MY_QUEUE.to_s
+ end
+end
diff --git a/activejob/test/helper.rb b/activejob/test/helper.rb
index 57907042d9..55f690bda8 100644
--- a/activejob/test/helper.rb
+++ b/activejob/test/helper.rb
@@ -3,6 +3,7 @@ require File.expand_path('../../../load_paths', __FILE__)
require 'active_job'
require 'support/job_buffer'
+ActiveSupport.halt_callback_chains_on_return_false = false
GlobalID.app = 'aj'
@adapter = ENV['AJ_ADAPTER'] || 'inline'
diff --git a/activejob/test/integration/queuing_test.rb b/activejob/test/integration/queuing_test.rb
index ca8047ef0b..125ba54302 100644
--- a/activejob/test/integration/queuing_test.rb
+++ b/activejob/test/integration/queuing_test.rb
@@ -11,7 +11,7 @@ class QueuingTest < ActiveSupport::TestCase
end
test 'should not run jobs queued on a non-listening queue' do
- skip if adapter_is?(:inline, :sucker_punch, :que)
+ skip if adapter_is?(:inline, :async, :sucker_punch, :que)
old_queue = TestJob.queue_name
begin
diff --git a/activejob/test/jobs/queue_as_job.rb b/activejob/test/jobs/queue_as_job.rb
new file mode 100644
index 0000000000..897aef52e5
--- /dev/null
+++ b/activejob/test/jobs/queue_as_job.rb
@@ -0,0 +1,10 @@
+require_relative '../support/job_buffer'
+
+class QueueAsJob < ActiveJob::Base
+ MY_QUEUE = :low_priority
+ queue_as MY_QUEUE
+
+ def perform(greeter = "David")
+ JobBuffer.add("#{greeter} says hello")
+ end
+end
diff --git a/activejob/test/support/integration/adapters/async.rb b/activejob/test/support/integration/adapters/async.rb
new file mode 100644
index 0000000000..42beb12b1f
--- /dev/null
+++ b/activejob/test/support/integration/adapters/async.rb
@@ -0,0 +1,9 @@
+module AsyncJobsManager
+ def setup
+ ActiveJob::Base.queue_adapter = :async
+ end
+
+ def clear_jobs
+ ActiveJob::AsyncJob::QUEUES.clear
+ end
+end