diff options
Diffstat (limited to 'activesupport/lib/active_support/queueing.rb')
-rw-r--r-- | activesupport/lib/active_support/queueing.rb | 133 |
1 files changed, 133 insertions, 0 deletions
diff --git a/activesupport/lib/active_support/queueing.rb b/activesupport/lib/active_support/queueing.rb new file mode 100644 index 0000000000..0a4ab05b78 --- /dev/null +++ b/activesupport/lib/active_support/queueing.rb @@ -0,0 +1,133 @@ +require 'delegate' +require 'thread' + +module ActiveSupport + # A Queue that simply inherits from STDLIB's Queue. When this + # queue is used, Rails automatically starts a job runner in a + # background thread. + class Queue < ::Queue + attr_writer :consumer + + def initialize(consumer_options = {}) + super() + @consumer_options = consumer_options + end + + def consumer + @consumer ||= ThreadedQueueConsumer.new(self, @consumer_options) + end + + # Drain the queue, running all jobs in a different thread. This method + # may not be available on production queues. + def drain + # run the jobs in a separate thread so assumptions of synchronous + # jobs are caught in test mode. + consumer.drain + end + end + + class SynchronousQueue < Queue + def push(job) + super.tap { drain } + end + alias << push + alias enq push + end + + # In test mode, the Rails queue is backed by an Array so that assertions + # can be made about its contents. The test queue provides a +jobs+ + # method to make assertions about the queue's contents and a +drain+ + # method to drain the queue and run the jobs. + # + # Jobs are run in a separate thread to catch mistakes where code + # assumes that the job is run in the same thread. + class TestQueue < Queue + # Get a list of the jobs off this queue. This method may not be + # available on production queues. + def jobs + @que.dup + end + + # Marshal and unmarshal job before pushing it onto the queue. This will + # raise an exception on any attempts in tests to push jobs that can't (or + # shouldn't) be marshalled. + def push(job) + super Marshal.load(Marshal.dump(job)) + end + end + + # A container for multiple queues. This class delegates to a default Queue + # so that <tt>Rails.queue.push</tt> and friends will Just Work. To use this class + # with multiple queues: + # + # # In your configuration: + # Rails.queue[:image_queue] = SomeQueue.new + # Rails.queue[:mail_queue] = SomeQueue.new + # + # # In your app code: + # Rails.queue[:mail_queue].push SomeJob.new + # + class QueueContainer < DelegateClass(::Queue) + def initialize(default_queue) + @queues = { :default => default_queue } + super(default_queue) + end + + def [](queue_name) + @queues[queue_name] + end + + def []=(queue_name, queue) + @queues[queue_name] = queue + end + end + + # The threaded consumer will run jobs in a background thread in + # development mode or in a VM where running jobs on a thread in + # production mode makes sense. + # + # When the process exits, the consumer pushes a nil onto the + # queue and joins the thread, which will ensure that all jobs + # are executed before the process finally dies. + class ThreadedQueueConsumer + def self.start(*args) + new(*args).start + end + + def initialize(queue, options = {}) + @queue = queue + @logger = options[:logger] + end + + def start + @thread = Thread.new { consume } + self + end + + def shutdown + @queue.push nil + @thread.join + end + + def drain + run(@queue.pop) until @queue.empty? + end + + def consume + while job = @queue.pop + run job + end + end + + def run(job) + job.run + rescue Exception => exception + handle_exception job, exception + end + + def handle_exception(job, exception) + raise unless @logger + @logger.error "Job Error: #{exception.message}\n#{exception.backtrace.join("\n")}" + end + end +end |