diff options
Diffstat (limited to 'activesupport/lib')
-rw-r--r-- | activesupport/lib/active_support/queueing.rb | 81 |
1 files changed, 49 insertions, 32 deletions
diff --git a/activesupport/lib/active_support/queueing.rb b/activesupport/lib/active_support/queueing.rb index f397e1c0c5..d36b5c17a8 100644 --- a/activesupport/lib/active_support/queueing.rb +++ b/activesupport/lib/active_support/queueing.rb @@ -2,17 +2,33 @@ require 'delegate' require 'thread' module ActiveSupport - # A Queue that simply inherits from STDLIB's Queue. Everytime this - # queue is used, Rails automatically sets up a ThreadedConsumer - # to consume it. + # A Queue that simply inherits from STDLIB's Queue. When this + # queue is used, Rails automatically starts a job runner in a + # background thread. class Queue < ::Queue + attr_writer :consumer + + def initialize(consumer_options = {}) + super() + @consumer_options = consumer_options + end + + def consumer + @consumer ||= ThreadedQueueConsumer.new(self, @consumer_options) + end + + # Drain the queue, running all jobs in a different thread. This method + # may not be available on production queues. + def drain + # run the jobs in a separate thread so assumptions of synchronous + # jobs are caught in test mode. + consumer.drain + end end - class SynchronousQueue < ::Queue + class SynchronousQueue < Queue def push(job) - result = nil - Thread.new { result = job.run }.join - result + super.tap { drain } end alias << push alias enq push @@ -25,7 +41,7 @@ module ActiveSupport # # Jobs are run in a separate thread to catch mistakes where code # assumes that the job is run in the same thread. - class TestQueue < ::Queue + class TestQueue < Queue # Get a list of the jobs off this queue. This method may not be # available on production queues. def jobs @@ -38,14 +54,6 @@ module ActiveSupport def push(job) super Marshal.load(Marshal.dump(job)) end - - # Drain the queue, running all jobs in a different thread. This method - # may not be available on production queues. - def drain - # run the jobs in a separate thread so assumptions of synchronous - # jobs are caught in test mode. - Thread.new { pop.run until empty? }.join - end end # A container for multiple queues. This class delegates to a default Queue @@ -82,25 +90,17 @@ module ActiveSupport # queue and joins the thread, which will ensure that all jobs # are executed before the process finally dies. class ThreadedQueueConsumer - def self.start(queue, logger=nil) - new(queue, logger).start + def self.start(*args) + new(*args).start end - def initialize(queue, logger=nil) - @queue = queue - @logger = logger + def initialize(queue, options = {}) + @queue = queue + @logger = options[:logger] end def start - @thread = Thread.new do - while job = @queue.pop - begin - job.run - rescue Exception => e - handle_exception e - end - end - end + @thread = Thread.new { consume } self end @@ -109,8 +109,25 @@ module ActiveSupport @thread.join end - def handle_exception(e) - @logger.error "Job Error: #{e.message}\n#{e.backtrace.join("\n")}" if @logger + def drain + Thread.new { run(@queue.pop) until @queue.empty? }.join + end + + def consume + while job = @queue.pop + run job + end + end + + def run(job) + job.run + rescue Exception => exception + handle_exception job, exception + end + + def handle_exception(job, exception) + raise unless @logger + @logger.error "Job Error: #{exception.message}\n#{exception.backtrace.join("\n")}" end end end |