aboutsummaryrefslogtreecommitdiffstats
path: root/activejob/lib/active_job/async_job.rb
blob: 7fcffc4c24ff628d59039ca89bf7eb1500a1d812 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
require 'concurrent'
require 'thread_safe'

module ActiveJob
  # == Active Job Async Job
  #
  # When enqueueing jobs with Async Job each job will be executed asynchronously
  # on a +concurrent-ruby+ thread pool. All job data is retained in memory.
  # Because job data is not saved to a persistent datastore there is no
  # additional infrastructure needed and jobs process quickly. The lack of
  # persistence, however, means that all unprocessed jobs will be lost on
  # application restart. Therefore in-memory queue adapters are unsuitable for
  # most production environments but are excellent for development and testing.
  #
  # Read more about Concurrent Ruby {here}[https://github.com/ruby-concurrency/concurrent-ruby].
  #
  # To use Async Job set the queue_adapter config to +:async+.
  #
  #   Rails.application.config.active_job.queue_adapter = :async
  #
  # Async Job supports job queues specified with +queue_as+. Queues are created
  # automatically as needed and each has its own thread pool.
  class AsyncJob

    DEFAULT_EXECUTOR_OPTIONS = {
      min_threads:     [2, Concurrent.processor_count].max,
      max_threads:     Concurrent.processor_count * 10,
      auto_terminate:  true,
      idletime:        60, # 1 minute
      max_queue:       0, # unlimited
      fallback_policy: :caller_runs # shouldn't matter -- 0 max queue
    }.freeze

    QUEUES = ThreadSafe::Cache.new do |hash, queue_name| #:nodoc:
      hash.compute_if_absent(queue_name) { ActiveJob::AsyncJob.create_thread_pool }
    end

    class << self
      # Forces jobs to process immediately when testing the Active Job gem.
      # This should only be called from within unit tests.
      def perform_immediately! #:nodoc:
        @perform_immediately = true
      end

      # Allows jobs to run asynchronously when testing the Active Job gem.
      # This should only be called from within unit tests.
      def perform_asynchronously! #:nodoc:
        @perform_immediately = false
      end

      def create_thread_pool #:nodoc:
        if @perform_immediately
          Concurrent::ImmediateExecutor.new
        else
          Concurrent::ThreadPoolExecutor.new(DEFAULT_EXECUTOR_OPTIONS)
        end
      end

      def enqueue(job_data, queue: 'default') #:nodoc:
        QUEUES[queue].post(job_data) { |job| ActiveJob::Base.execute(job) }
      end

      def enqueue_at(job_data, timestamp, queue: 'default') #:nodoc:
        delay = timestamp - Time.current.to_f
        if delay > 0
          Concurrent::ScheduledTask.execute(delay, args: [job_data], executor: QUEUES[queue]) do |job|
            ActiveJob::Base.execute(job)
          end
        else
          enqueue(job_data, queue: queue)
        end
      end
    end
  end
end