diff options
Diffstat (limited to 'activejob/lib/active_job')
-rw-r--r-- | activejob/lib/active_job/async_job.rb | 77 | ||||
-rw-r--r-- | activejob/lib/active_job/callbacks.rb | 5 | ||||
-rw-r--r-- | activejob/lib/active_job/execution.rb | 6 | ||||
-rw-r--r-- | activejob/lib/active_job/queue_adapters/async_adapter.rb | 105 | ||||
-rw-r--r-- | activejob/lib/active_job/railtie.rb | 9 |
5 files changed, 116 insertions, 86 deletions
diff --git a/activejob/lib/active_job/async_job.rb b/activejob/lib/active_job/async_job.rb deleted file mode 100644 index 417ace21d2..0000000000 --- a/activejob/lib/active_job/async_job.rb +++ /dev/null @@ -1,77 +0,0 @@ -require 'concurrent/map' -require 'concurrent/scheduled_task' -require 'concurrent/executor/thread_pool_executor' -require 'concurrent/utility/processor_counter' - -module ActiveJob - # == Active Job Async Job - # - # When enqueuing jobs with Async Job each job will be executed asynchronously - # on a +concurrent-ruby+ thread pool. All job data is retained in memory. - # Because job data is not saved to a persistent datastore there is no - # additional infrastructure needed and jobs process quickly. The lack of - # persistence, however, means that all unprocessed jobs will be lost on - # application restart. Therefore in-memory queue adapters are unsuitable for - # most production environments but are excellent for development and testing. - # - # Read more about Concurrent Ruby {here}[https://github.com/ruby-concurrency/concurrent-ruby]. - # - # To use Async Job set the queue_adapter config to +:async+. - # - # Rails.application.config.active_job.queue_adapter = :async - # - # Async Job supports job queues specified with +queue_as+. Queues are created - # automatically as needed and each has its own thread pool. - class AsyncJob - - DEFAULT_EXECUTOR_OPTIONS = { - min_threads: [2, Concurrent.processor_count].max, - max_threads: Concurrent.processor_count * 10, - auto_terminate: true, - idletime: 60, # 1 minute - max_queue: 0, # unlimited - fallback_policy: :caller_runs # shouldn't matter -- 0 max queue - }.freeze - - QUEUES = Concurrent::Map.new do |hash, queue_name| #:nodoc: - hash.compute_if_absent(queue_name) { ActiveJob::AsyncJob.create_thread_pool } - end - - class << self - # Forces jobs to process immediately when testing the Active Job gem. - # This should only be called from within unit tests. - def perform_immediately! #:nodoc: - @perform_immediately = true - end - - # Allows jobs to run asynchronously when testing the Active Job gem. - # This should only be called from within unit tests. - def perform_asynchronously! #:nodoc: - @perform_immediately = false - end - - def create_thread_pool #:nodoc: - if @perform_immediately - Concurrent::ImmediateExecutor.new - else - Concurrent::ThreadPoolExecutor.new(DEFAULT_EXECUTOR_OPTIONS) - end - end - - def enqueue(job_data, queue: 'default') #:nodoc: - QUEUES[queue].post(job_data) { |job| ActiveJob::Base.execute(job) } - end - - def enqueue_at(job_data, timestamp, queue: 'default') #:nodoc: - delay = timestamp - Time.current.to_f - if delay > 0 - Concurrent::ScheduledTask.execute(delay, args: [job_data], executor: QUEUES[queue]) do |job| - ActiveJob::Base.execute(job) - end - else - enqueue(job_data, queue: queue) - end - end - end - end -end diff --git a/activejob/lib/active_job/callbacks.rb b/activejob/lib/active_job/callbacks.rb index 2b6149e84e..a6591c6a05 100644 --- a/activejob/lib/active_job/callbacks.rb +++ b/activejob/lib/active_job/callbacks.rb @@ -17,6 +17,11 @@ module ActiveJob extend ActiveSupport::Concern include ActiveSupport::Callbacks + class << self + include ActiveSupport::Callbacks + define_callbacks :execute + end + included do define_callbacks :perform define_callbacks :enqueue diff --git a/activejob/lib/active_job/execution.rb b/activejob/lib/active_job/execution.rb index 79d232da4a..7c4151fc90 100644 --- a/activejob/lib/active_job/execution.rb +++ b/activejob/lib/active_job/execution.rb @@ -17,8 +17,10 @@ module ActiveJob end def execute(job_data) #:nodoc: - job = deserialize(job_data) - job.perform_now + ActiveJob::Callbacks.run_callbacks(:execute) do + job = deserialize(job_data) + job.perform_now + end end end diff --git a/activejob/lib/active_job/queue_adapters/async_adapter.rb b/activejob/lib/active_job/queue_adapters/async_adapter.rb index 3d3c749883..922bc4afce 100644 --- a/activejob/lib/active_job/queue_adapters/async_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/async_adapter.rb @@ -1,22 +1,113 @@ -require 'active_job/async_job' +require 'securerandom' +require 'concurrent/scheduled_task' +require 'concurrent/executor/thread_pool_executor' +require 'concurrent/utility/processor_counter' module ActiveJob module QueueAdapters # == Active Job Async adapter # - # When enqueuing jobs with the Async adapter the job will be executed - # asynchronously using {AsyncJob}[http://api.rubyonrails.org/classes/ActiveJob/AsyncJob.html]. + # The Async adapter runs jobs with an in-process thread pool. # - # To use +AsyncJob+ set the queue_adapter config to +:async+. + # This is the default queue adapter. It's well-suited for dev/test since + # it doesn't need an external infrastructure, but it's a poor fit for + # production since it drops pending jobs on restart. # - # Rails.application.config.active_job.queue_adapter = :async + # To use this adapter, set queue adapter to +:async+: + # + # config.active_job.queue_adapter = :async + # + # To configure the adapter's thread pool, instantiate the adapter and + # pass your own config: + # + # config.active_job.queue_adapter = ActiveJob::QueueAdapters::AsyncAdapter.new \ + # min_threads: 1, + # max_threads: 2 * Concurrent.processor_count, + # idletime: 600.seconds + # + # The adapter uses a {Concurrent Ruby}[https://github.com/ruby-concurrency/concurrent-ruby] thread pool to schedule and execute + # jobs. Since jobs share a single thread pool, long-running jobs will block + # short-lived jobs. Fine for dev/test; bad for production. class AsyncAdapter + # See {Concurrent::ThreadPoolExecutor}[http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/ThreadPoolExecutor.html] for executor options. + def initialize(**executor_options) + @scheduler = Scheduler.new(**executor_options) + end + def enqueue(job) #:nodoc: - ActiveJob::AsyncJob.enqueue(job.serialize, queue: job.queue_name) + @scheduler.enqueue JobWrapper.new(job), queue_name: job.queue_name end def enqueue_at(job, timestamp) #:nodoc: - ActiveJob::AsyncJob.enqueue_at(job.serialize, timestamp, queue: job.queue_name) + @scheduler.enqueue_at JobWrapper.new(job), timestamp, queue_name: job.queue_name + end + + # Gracefully stop processing jobs. Finishes in-progress work and handles + # any new jobs following the executor's fallback policy (`caller_runs`). + # Waits for termination by default. Pass `wait: false` to continue. + def shutdown(wait: true) #:nodoc: + @scheduler.shutdown wait: wait + end + + # Used for our test suite. + def immediate=(immediate) #:nodoc: + @scheduler.immediate = immediate + end + + # Note that we don't actually need to serialize the jobs since we're + # performing them in-process, but we do so anyway for parity with other + # adapters and deployment environments. Otherwise, serialization bugs + # may creep in undetected. + class JobWrapper #:nodoc: + def initialize(job) + job.provider_job_id = SecureRandom.uuid + @job_data = job.serialize + end + + def perform + Base.execute @job_data + end + end + + class Scheduler #:nodoc: + DEFAULT_EXECUTOR_OPTIONS = { + min_threads: 0, + max_threads: Concurrent.processor_count, + auto_terminate: true, + idletime: 60, # 1 minute + max_queue: 0, # unlimited + fallback_policy: :caller_runs # shouldn't matter -- 0 max queue + }.freeze + + attr_accessor :immediate + + def initialize(**options) + self.immediate = false + @immediate_executor = Concurrent::ImmediateExecutor.new + @async_executor = Concurrent::ThreadPoolExecutor.new(DEFAULT_EXECUTOR_OPTIONS.merge(options)) + end + + def enqueue(job, queue_name:) + executor.post(job, &:perform) + end + + def enqueue_at(job, timestamp, queue_name:) + delay = timestamp - Time.current.to_f + if delay > 0 + Concurrent::ScheduledTask.execute(delay, args: [job], executor: executor, &:perform) + else + enqueue(job, queue_name: queue_name) + end + end + + def shutdown(wait: true) + @async_executor.shutdown + @async_executor.wait_for_termination if wait + end + + def executor + immediate ? @immediate_executor : @async_executor + end end end end diff --git a/activejob/lib/active_job/railtie.rb b/activejob/lib/active_job/railtie.rb index b249ec09dd..a47caa4a7e 100644 --- a/activejob/lib/active_job/railtie.rb +++ b/activejob/lib/active_job/railtie.rb @@ -19,5 +19,14 @@ module ActiveJob end end + initializer "active_job.set_reloader_hook" do |app| + ActiveSupport.on_load(:active_job) do + ActiveJob::Callbacks.singleton_class.set_callback(:execute, :around, prepend: true) do |_, inner| + app.reloader.wrap do + inner.call + end + end + end + end end end |