blob: 7c352886f335574bbb93675025858eba16f05913 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
|
require 'delegate'
require 'thread'
module ActiveSupport
# A Queue that simply inherits from STDLIB's Queue. When this
# queue is used, Rails automatically starts a job runner in a
# background thread.
class Queue < ::Queue
attr_writer :consumer
def initialize(consumer_options = {})
super()
@consumer_options = consumer_options
end
def consumer
@consumer ||= ThreadedQueueConsumer.new(self, @consumer_options)
end
# Drain the queue, running all jobs in a different thread. This method
# may not be available on production queues.
def drain
# run the jobs in a separate thread so assumptions of synchronous
# jobs are caught in test mode.
consumer.drain
end
end
class SynchronousQueue < Queue
def push(job)
super.tap { drain }
end
alias << push
alias enq push
end
# In test mode, the Rails queue is backed by an Array so that assertions
# can be made about its contents. The test queue provides a +jobs+
# method to make assertions about the queue's contents and a +drain+
# method to drain the queue and run the jobs.
#
# Jobs are run in a separate thread to catch mistakes where code
# assumes that the job is run in the same thread.
class TestQueue < Queue
# Get a list of the jobs off this queue. This method may not be
# available on production queues.
def jobs
@que.dup
end
# Marshal and unmarshal job before pushing it onto the queue. This will
# raise an exception on any attempts in tests to push jobs that can't (or
# shouldn't) be marshalled.
def push(job)
super Marshal.load(Marshal.dump(job))
end
end
# A container for multiple queues. This class delegates to a default Queue
# so that <tt>Rails.queue.push</tt> and friends will Just Work. To use this class
# with multiple queues:
#
# # In your configuration:
# Rails.queue[:image_queue] = SomeQueue.new
# Rails.queue[:mail_queue] = SomeQueue.new
#
# # In your app code:
# Rails.queue[:mail_queue].push SomeJob.new
#
class QueueContainer < DelegateClass(::Queue)
def initialize(default_queue)
@queues = { :default => default_queue }
super(default_queue)
end
def [](queue_name)
@queues[queue_name]
end
def []=(queue_name, queue)
@queues[queue_name] = queue
end
end
# The threaded consumer will run jobs in a background thread in
# development mode or in a VM where running jobs on a thread in
# production mode makes sense.
#
# When the process exits, the consumer pushes a nil onto the
# queue and joins the thread, which will ensure that all jobs
# are executed before the process finally dies.
class ThreadedQueueConsumer
def self.start(*args)
new(*args).start
end
def initialize(queue, options = {})
@queue = queue
@logger = options[:logger]
end
def start
@thread = Thread.new { consume }
self
end
def shutdown
@queue.push nil
@thread.join
end
def drain
run(@queue.pop) until @queue.empty?
end
def consume
while job = @queue.pop
run job
end
end
def run(job)
job.run
rescue Exception => exception
handle_exception job, exception
end
def handle_exception(job, exception)
raise exception unless @logger
@logger.error "Job Error: #{exception.message}\n#{exception.backtrace.join("\n")}"
end
end
end
|