diff options
author | Santiago Pastorino <santiago@wyeworks.com> | 2012-09-13 15:09:15 -0700 |
---|---|---|
committer | Santiago Pastorino <santiago@wyeworks.com> | 2012-09-14 14:10:00 -0700 |
commit | 8577687fcb9da20868a5ea50aea36427270d4485 (patch) | |
tree | 51fa7cb8e0e153da58a9c65d39a862d7ee7a6c2b /activesupport | |
parent | ae00adecf420703bd13f52ffad23eb417b2cf244 (diff) | |
download | rails-8577687fcb9da20868a5ea50aea36427270d4485.tar.gz rails-8577687fcb9da20868a5ea50aea36427270d4485.tar.bz2 rails-8577687fcb9da20868a5ea50aea36427270d4485.zip |
Move queue classes to ActiveSupport
Diffstat (limited to 'activesupport')
-rw-r--r-- | activesupport/lib/active_support/queueing.rb | 116 | ||||
-rw-r--r-- | activesupport/test/queueing/container_test.rb | 28 | ||||
-rw-r--r-- | activesupport/test/queueing/test_queue_test.rb | 102 | ||||
-rw-r--r-- | activesupport/test/queueing/threaded_consumer_test.rb | 98 |
4 files changed, 344 insertions, 0 deletions
diff --git a/activesupport/lib/active_support/queueing.rb b/activesupport/lib/active_support/queueing.rb new file mode 100644 index 0000000000..f397e1c0c5 --- /dev/null +++ b/activesupport/lib/active_support/queueing.rb @@ -0,0 +1,116 @@ +require 'delegate' +require 'thread' + +module ActiveSupport + # A Queue that simply inherits from STDLIB's Queue. Everytime this + # queue is used, Rails automatically sets up a ThreadedConsumer + # to consume it. + class Queue < ::Queue + end + + class SynchronousQueue < ::Queue + def push(job) + result = nil + Thread.new { result = job.run }.join + result + end + alias << push + alias enq push + end + + # In test mode, the Rails queue is backed by an Array so that assertions + # can be made about its contents. The test queue provides a +jobs+ + # method to make assertions about the queue's contents and a +drain+ + # method to drain the queue and run the jobs. + # + # Jobs are run in a separate thread to catch mistakes where code + # assumes that the job is run in the same thread. + class TestQueue < ::Queue + # Get a list of the jobs off this queue. This method may not be + # available on production queues. + def jobs + @que.dup + end + + # Marshal and unmarshal job before pushing it onto the queue. This will + # raise an exception on any attempts in tests to push jobs that can't (or + # shouldn't) be marshalled. + def push(job) + super Marshal.load(Marshal.dump(job)) + end + + # Drain the queue, running all jobs in a different thread. This method + # may not be available on production queues. + def drain + # run the jobs in a separate thread so assumptions of synchronous + # jobs are caught in test mode. + Thread.new { pop.run until empty? }.join + end + end + + # A container for multiple queues. This class delegates to a default Queue + # so that <tt>Rails.queue.push</tt> and friends will Just Work. To use this class + # with multiple queues: + # + # # In your configuration: + # Rails.queue[:image_queue] = SomeQueue.new + # Rails.queue[:mail_queue] = SomeQueue.new + # + # # In your app code: + # Rails.queue[:mail_queue].push SomeJob.new + # + class QueueContainer < DelegateClass(::Queue) + def initialize(default_queue) + @queues = { :default => default_queue } + super(default_queue) + end + + def [](queue_name) + @queues[queue_name] + end + + def []=(queue_name, queue) + @queues[queue_name] = queue + end + end + + # The threaded consumer will run jobs in a background thread in + # development mode or in a VM where running jobs on a thread in + # production mode makes sense. + # + # When the process exits, the consumer pushes a nil onto the + # queue and joins the thread, which will ensure that all jobs + # are executed before the process finally dies. + class ThreadedQueueConsumer + def self.start(queue, logger=nil) + new(queue, logger).start + end + + def initialize(queue, logger=nil) + @queue = queue + @logger = logger + end + + def start + @thread = Thread.new do + while job = @queue.pop + begin + job.run + rescue Exception => e + handle_exception e + end + end + end + self + end + + def shutdown + @queue.push nil + @thread.join + end + + def handle_exception(e) + @logger.error "Job Error: #{e.message}\n#{e.backtrace.join("\n")}" if @logger + end + end +end diff --git a/activesupport/test/queueing/container_test.rb b/activesupport/test/queueing/container_test.rb new file mode 100644 index 0000000000..7afc11e7a9 --- /dev/null +++ b/activesupport/test/queueing/container_test.rb @@ -0,0 +1,28 @@ +require 'abstract_unit' +require 'active_support/queueing' + +module ActiveSupport + class ContainerTest < ActiveSupport::TestCase + def test_delegates_to_default + q = Queue.new + container = QueueContainer.new q + job = Object.new + + container.push job + assert_equal job, q.pop + end + + def test_access_default + q = Queue.new + container = QueueContainer.new q + assert_equal q, container[:default] + end + + def test_assign_queue + container = QueueContainer.new Object.new + q = Object.new + container[:foo] = q + assert_equal q, container[:foo] + end + end +end diff --git a/activesupport/test/queueing/test_queue_test.rb b/activesupport/test/queueing/test_queue_test.rb new file mode 100644 index 0000000000..4c08314366 --- /dev/null +++ b/activesupport/test/queueing/test_queue_test.rb @@ -0,0 +1,102 @@ +require 'abstract_unit' +require 'active_support/queueing' + +class TestQueueTest < ActiveSupport::TestCase + def setup + @queue = ActiveSupport::TestQueue.new + end + + class ExceptionRaisingJob + def run + raise + end + end + + def test_drain_raises + @queue.push ExceptionRaisingJob.new + assert_raises(RuntimeError) { @queue.drain } + end + + def test_jobs + @queue.push 1 + @queue.push 2 + assert_equal [1,2], @queue.jobs + end + + class EquivalentJob + def initialize + @initial_id = self.object_id + end + + def run + end + + def ==(other) + other.same_initial_id?(@initial_id) + end + + def same_initial_id?(other_id) + other_id == @initial_id + end + end + + def test_contents + assert @queue.empty? + job = EquivalentJob.new + @queue.push job + refute @queue.empty? + assert_equal job, @queue.pop + end + + class ProcessingJob + def self.clear_processed + @processed = [] + end + + def self.processed + @processed + end + + def initialize(object) + @object = object + end + + def run + self.class.processed << @object + end + end + + def test_order + ProcessingJob.clear_processed + job1 = ProcessingJob.new(1) + job2 = ProcessingJob.new(2) + + @queue.push job1 + @queue.push job2 + @queue.drain + + assert_equal [1,2], ProcessingJob.processed + end + + class ThreadTrackingJob + attr_reader :thread_id + + def run + @thread_id = Thread.current.object_id + end + + def ran? + @thread_id + end + end + + def test_drain + @queue.push ThreadTrackingJob.new + job = @queue.jobs.last + @queue.drain + + assert @queue.empty? + assert job.ran?, "The job runs synchronously when the queue is drained" + assert_not_equal job.thread_id, Thread.current.object_id + end +end diff --git a/activesupport/test/queueing/threaded_consumer_test.rb b/activesupport/test/queueing/threaded_consumer_test.rb new file mode 100644 index 0000000000..20a1cc4e8e --- /dev/null +++ b/activesupport/test/queueing/threaded_consumer_test.rb @@ -0,0 +1,98 @@ +require 'abstract_unit' +require 'active_support/queueing' +require "active_support/log_subscriber/test_helper" + +class TestThreadConsumer < ActiveSupport::TestCase + class Job + attr_reader :id + def initialize(id, &block) + @id = id + @block = block + end + + def run + @block.call if @block + end + end + + def setup + @queue = ActiveSupport::Queue.new + @logger = ActiveSupport::LogSubscriber::TestHelper::MockLogger.new + @consumer = ActiveSupport::ThreadedQueueConsumer.start(@queue, @logger) + end + + def teardown + @queue.push nil + end + + test "the jobs are executed" do + ran = false + + job = Job.new(1) do + ran = true + end + + @queue.push job + sleep 0.1 + assert_equal true, ran + end + + test "the jobs are not executed synchronously" do + ran = false + + job = Job.new(1) do + sleep 0.1 + ran = true + end + + @queue.push job + assert_equal false, ran + end + + test "shutting down the queue synchronously drains the jobs" do + ran = false + + job = Job.new(1) do + sleep 0.1 + ran = true + end + + @queue.push job + assert_equal false, ran + + @consumer.shutdown + + assert_equal true, ran + end + + test "log job that raises an exception" do + job = Job.new(1) do + raise "RuntimeError: Error!" + end + + @queue.push job + sleep 0.1 + + assert_equal 1, @logger.logged(:error).size + assert_match(/Job Error: RuntimeError: Error!/, @logger.logged(:error).last) + end + + test "test overriding exception handling" do + @consumer.shutdown + @consumer = Class.new(ActiveSupport::ThreadedQueueConsumer) do + attr_reader :last_error + def handle_exception(e) + @last_error = e.message + end + end.start(@queue) + + job = Job.new(1) do + raise "RuntimeError: Error!" + end + + @queue.push job + sleep 0.1 + + assert_equal "RuntimeError: Error!", @consumer.last_error + end +end |