aboutsummaryrefslogblamecommitdiffstats
path: root/activejob/lib/active_job/async_job.rb
blob: ed7a6e8d9b68d70aa9e0bad7050a270c96358627 (plain) (tree)
1
2
3
4



                                                  






























                                                                                                
                                                               








































                                                                                                      
require 'concurrent/map'
require 'concurrent/scheduled_task'
require 'concurrent/executor/thread_pool_executor'
require 'concurrent/utility/processor_counter'

module ActiveJob
  # == Active Job Async Job
  #
  # When enqueueing jobs with Async Job each job will be executed asynchronously
  # on a +concurrent-ruby+ thread pool. All job data is retained in memory.
  # Because job data is not saved to a persistent datastore there is no
  # additional infrastructure needed and jobs process quickly. The lack of
  # persistence, however, means that all unprocessed jobs will be lost on
  # application restart. Therefore in-memory queue adapters are unsuitable for
  # most production environments but are excellent for development and testing.
  #
  # Read more about Concurrent Ruby {here}[https://github.com/ruby-concurrency/concurrent-ruby].
  #
  # To use Async Job set the queue_adapter config to +:async+.
  #
  #   Rails.application.config.active_job.queue_adapter = :async
  #
  # Async Job supports job queues specified with +queue_as+. Queues are created
  # automatically as needed and each has its own thread pool.
  class AsyncJob

    DEFAULT_EXECUTOR_OPTIONS = {
      min_threads:     [2, Concurrent.processor_count].max,
      max_threads:     Concurrent.processor_count * 10,
      auto_terminate:  true,
      idletime:        60, # 1 minute
      max_queue:       0, # unlimited
      fallback_policy: :caller_runs # shouldn't matter -- 0 max queue
    }.freeze

    QUEUES = Concurrent::Map.new do |hash, queue_name| #:nodoc:
      hash.compute_if_absent(queue_name) { ActiveJob::AsyncJob.create_thread_pool }
    end

    class << self
      # Forces jobs to process immediately when testing the Active Job gem.
      # This should only be called from within unit tests.
      def perform_immediately! #:nodoc:
        @perform_immediately = true
      end

      # Allows jobs to run asynchronously when testing the Active Job gem.
      # This should only be called from within unit tests.
      def perform_asynchronously! #:nodoc:
        @perform_immediately = false
      end

      def create_thread_pool #:nodoc:
        if @perform_immediately
          Concurrent::ImmediateExecutor.new
        else
          Concurrent::ThreadPoolExecutor.new(DEFAULT_EXECUTOR_OPTIONS)
        end
      end

      def enqueue(job_data, queue: 'default') #:nodoc:
        QUEUES[queue].post(job_data) { |job| ActiveJob::Base.execute(job) }
      end

      def enqueue_at(job_data, timestamp, queue: 'default') #:nodoc:
        delay = timestamp - Time.current.to_f
        if delay > 0
          Concurrent::ScheduledTask.execute(delay, args: [job_data], executor: QUEUES[queue]) do |job|
            ActiveJob::Base.execute(job)
          end
        else
          enqueue(job_data, queue: queue)
        end
      end
    end
  end
end