aboutsummaryrefslogtreecommitdiffstats
path: root/activesupport/lib/active_support/queueing.rb
blob: 0a4ab05b78838341d3cd2ee565bf671456053fd9 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
require 'delegate'
require 'thread'

module ActiveSupport
  # A Queue that simply inherits from STDLIB's Queue. When this
  # queue is used, Rails automatically starts a job runner in a
  # background thread.
  class Queue < ::Queue
    attr_writer :consumer

    def initialize(consumer_options = {})
      super()
      @consumer_options = consumer_options
    end

    def consumer
      @consumer ||= ThreadedQueueConsumer.new(self, @consumer_options)
    end

    # Drain the queue, running all jobs in a different thread. This method
    # may not be available on production queues.
    def drain
      # run the jobs in a separate thread so assumptions of synchronous
      # jobs are caught in test mode.
      consumer.drain
    end
  end

  class SynchronousQueue < Queue
    def push(job)
      super.tap { drain }
    end
    alias <<  push
    alias enq push
  end

  # In test mode, the Rails queue is backed by an Array so that assertions
  # can be made about its contents. The test queue provides a +jobs+
  # method to make assertions about the queue's contents and a +drain+
  # method to drain the queue and run the jobs.
  #
  # Jobs are run in a separate thread to catch mistakes where code
  # assumes that the job is run in the same thread.
  class TestQueue < Queue
    # Get a list of the jobs off this queue. This method may not be
    # available on production queues.
    def jobs
      @que.dup
    end

    # Marshal and unmarshal job before pushing it onto the queue.  This will
    # raise an exception on any attempts in tests to push jobs that can't (or
    # shouldn't) be marshalled.
    def push(job)
      super Marshal.load(Marshal.dump(job))
    end
  end

  # A container for multiple queues. This class delegates to a default Queue
  # so that <tt>Rails.queue.push</tt> and friends will Just Work. To use this class
  # with multiple queues:
  #
  #   # In your configuration:
  #   Rails.queue[:image_queue] = SomeQueue.new
  #   Rails.queue[:mail_queue]  = SomeQueue.new
  #
  #   # In your app code:
  #   Rails.queue[:mail_queue].push SomeJob.new
  #
  class QueueContainer < DelegateClass(::Queue)
    def initialize(default_queue)
      @queues = { :default => default_queue }
      super(default_queue)
    end

    def [](queue_name)
      @queues[queue_name]
    end

    def []=(queue_name, queue)
      @queues[queue_name] = queue
    end
  end

  # The threaded consumer will run jobs in a background thread in
  # development mode or in a VM where running jobs on a thread in
  # production mode makes sense.
  #
  # When the process exits, the consumer pushes a nil onto the
  # queue and joins the thread, which will ensure that all jobs
  # are executed before the process finally dies.
  class ThreadedQueueConsumer
    def self.start(*args)
      new(*args).start
    end

    def initialize(queue, options = {})
      @queue = queue
      @logger = options[:logger]
    end

    def start
      @thread = Thread.new { consume }
      self
    end

    def shutdown
      @queue.push nil
      @thread.join
    end

    def drain
      run(@queue.pop) until @queue.empty?
    end

    def consume
      while job = @queue.pop
        run job
      end
    end

    def run(job)
      job.run
    rescue Exception => exception
      handle_exception job, exception
    end

    def handle_exception(job, exception)
      raise unless @logger
      @logger.error "Job Error: #{exception.message}\n#{exception.backtrace.join("\n")}"
    end
  end
end