aboutsummaryrefslogtreecommitdiffstats
path: root/activesupport
diff options
context:
space:
mode:
authorSantiago Pastorino <santiago@wyeworks.com>2012-09-13 15:09:15 -0700
committerSantiago Pastorino <santiago@wyeworks.com>2012-09-14 14:10:00 -0700
commit8577687fcb9da20868a5ea50aea36427270d4485 (patch)
tree51fa7cb8e0e153da58a9c65d39a862d7ee7a6c2b /activesupport
parentae00adecf420703bd13f52ffad23eb417b2cf244 (diff)
downloadrails-8577687fcb9da20868a5ea50aea36427270d4485.tar.gz
rails-8577687fcb9da20868a5ea50aea36427270d4485.tar.bz2
rails-8577687fcb9da20868a5ea50aea36427270d4485.zip
Move queue classes to ActiveSupport
Diffstat (limited to 'activesupport')
-rw-r--r--activesupport/lib/active_support/queueing.rb116
-rw-r--r--activesupport/test/queueing/container_test.rb28
-rw-r--r--activesupport/test/queueing/test_queue_test.rb102
-rw-r--r--activesupport/test/queueing/threaded_consumer_test.rb98
4 files changed, 344 insertions, 0 deletions
diff --git a/activesupport/lib/active_support/queueing.rb b/activesupport/lib/active_support/queueing.rb
new file mode 100644
index 0000000000..f397e1c0c5
--- /dev/null
+++ b/activesupport/lib/active_support/queueing.rb
@@ -0,0 +1,116 @@
+require 'delegate'
+require 'thread'
+
+module ActiveSupport
+ # A Queue that simply inherits from STDLIB's Queue. Everytime this
+ # queue is used, Rails automatically sets up a ThreadedConsumer
+ # to consume it.
+ class Queue < ::Queue
+ end
+
+ class SynchronousQueue < ::Queue
+ def push(job)
+ result = nil
+ Thread.new { result = job.run }.join
+ result
+ end
+ alias << push
+ alias enq push
+ end
+
+ # In test mode, the Rails queue is backed by an Array so that assertions
+ # can be made about its contents. The test queue provides a +jobs+
+ # method to make assertions about the queue's contents and a +drain+
+ # method to drain the queue and run the jobs.
+ #
+ # Jobs are run in a separate thread to catch mistakes where code
+ # assumes that the job is run in the same thread.
+ class TestQueue < ::Queue
+ # Get a list of the jobs off this queue. This method may not be
+ # available on production queues.
+ def jobs
+ @que.dup
+ end
+
+ # Marshal and unmarshal job before pushing it onto the queue. This will
+ # raise an exception on any attempts in tests to push jobs that can't (or
+ # shouldn't) be marshalled.
+ def push(job)
+ super Marshal.load(Marshal.dump(job))
+ end
+
+ # Drain the queue, running all jobs in a different thread. This method
+ # may not be available on production queues.
+ def drain
+ # run the jobs in a separate thread so assumptions of synchronous
+ # jobs are caught in test mode.
+ Thread.new { pop.run until empty? }.join
+ end
+ end
+
+ # A container for multiple queues. This class delegates to a default Queue
+ # so that <tt>Rails.queue.push</tt> and friends will Just Work. To use this class
+ # with multiple queues:
+ #
+ # # In your configuration:
+ # Rails.queue[:image_queue] = SomeQueue.new
+ # Rails.queue[:mail_queue] = SomeQueue.new
+ #
+ # # In your app code:
+ # Rails.queue[:mail_queue].push SomeJob.new
+ #
+ class QueueContainer < DelegateClass(::Queue)
+ def initialize(default_queue)
+ @queues = { :default => default_queue }
+ super(default_queue)
+ end
+
+ def [](queue_name)
+ @queues[queue_name]
+ end
+
+ def []=(queue_name, queue)
+ @queues[queue_name] = queue
+ end
+ end
+
+ # The threaded consumer will run jobs in a background thread in
+ # development mode or in a VM where running jobs on a thread in
+ # production mode makes sense.
+ #
+ # When the process exits, the consumer pushes a nil onto the
+ # queue and joins the thread, which will ensure that all jobs
+ # are executed before the process finally dies.
+ class ThreadedQueueConsumer
+ def self.start(queue, logger=nil)
+ new(queue, logger).start
+ end
+
+ def initialize(queue, logger=nil)
+ @queue = queue
+ @logger = logger
+ end
+
+ def start
+ @thread = Thread.new do
+ while job = @queue.pop
+ begin
+ job.run
+ rescue Exception => e
+ handle_exception e
+ end
+ end
+ end
+ self
+ end
+
+ def shutdown
+ @queue.push nil
+ @thread.join
+ end
+
+ def handle_exception(e)
+ @logger.error "Job Error: #{e.message}\n#{e.backtrace.join("\n")}" if @logger
+ end
+ end
+end
diff --git a/activesupport/test/queueing/container_test.rb b/activesupport/test/queueing/container_test.rb
new file mode 100644
index 0000000000..7afc11e7a9
--- /dev/null
+++ b/activesupport/test/queueing/container_test.rb
@@ -0,0 +1,28 @@
+require 'abstract_unit'
+require 'active_support/queueing'
+
+module ActiveSupport
+ class ContainerTest < ActiveSupport::TestCase
+ def test_delegates_to_default
+ q = Queue.new
+ container = QueueContainer.new q
+ job = Object.new
+
+ container.push job
+ assert_equal job, q.pop
+ end
+
+ def test_access_default
+ q = Queue.new
+ container = QueueContainer.new q
+ assert_equal q, container[:default]
+ end
+
+ def test_assign_queue
+ container = QueueContainer.new Object.new
+ q = Object.new
+ container[:foo] = q
+ assert_equal q, container[:foo]
+ end
+ end
+end
diff --git a/activesupport/test/queueing/test_queue_test.rb b/activesupport/test/queueing/test_queue_test.rb
new file mode 100644
index 0000000000..4c08314366
--- /dev/null
+++ b/activesupport/test/queueing/test_queue_test.rb
@@ -0,0 +1,102 @@
+require 'abstract_unit'
+require 'active_support/queueing'
+
+class TestQueueTest < ActiveSupport::TestCase
+ def setup
+ @queue = ActiveSupport::TestQueue.new
+ end
+
+ class ExceptionRaisingJob
+ def run
+ raise
+ end
+ end
+
+ def test_drain_raises
+ @queue.push ExceptionRaisingJob.new
+ assert_raises(RuntimeError) { @queue.drain }
+ end
+
+ def test_jobs
+ @queue.push 1
+ @queue.push 2
+ assert_equal [1,2], @queue.jobs
+ end
+
+ class EquivalentJob
+ def initialize
+ @initial_id = self.object_id
+ end
+
+ def run
+ end
+
+ def ==(other)
+ other.same_initial_id?(@initial_id)
+ end
+
+ def same_initial_id?(other_id)
+ other_id == @initial_id
+ end
+ end
+
+ def test_contents
+ assert @queue.empty?
+ job = EquivalentJob.new
+ @queue.push job
+ refute @queue.empty?
+ assert_equal job, @queue.pop
+ end
+
+ class ProcessingJob
+ def self.clear_processed
+ @processed = []
+ end
+
+ def self.processed
+ @processed
+ end
+
+ def initialize(object)
+ @object = object
+ end
+
+ def run
+ self.class.processed << @object
+ end
+ end
+
+ def test_order
+ ProcessingJob.clear_processed
+ job1 = ProcessingJob.new(1)
+ job2 = ProcessingJob.new(2)
+
+ @queue.push job1
+ @queue.push job2
+ @queue.drain
+
+ assert_equal [1,2], ProcessingJob.processed
+ end
+
+ class ThreadTrackingJob
+ attr_reader :thread_id
+
+ def run
+ @thread_id = Thread.current.object_id
+ end
+
+ def ran?
+ @thread_id
+ end
+ end
+
+ def test_drain
+ @queue.push ThreadTrackingJob.new
+ job = @queue.jobs.last
+ @queue.drain
+
+ assert @queue.empty?
+ assert job.ran?, "The job runs synchronously when the queue is drained"
+ assert_not_equal job.thread_id, Thread.current.object_id
+ end
+end
diff --git a/activesupport/test/queueing/threaded_consumer_test.rb b/activesupport/test/queueing/threaded_consumer_test.rb
new file mode 100644
index 0000000000..20a1cc4e8e
--- /dev/null
+++ b/activesupport/test/queueing/threaded_consumer_test.rb
@@ -0,0 +1,98 @@
+require 'abstract_unit'
+require 'active_support/queueing'
+require "active_support/log_subscriber/test_helper"
+
+class TestThreadConsumer < ActiveSupport::TestCase
+ class Job
+ attr_reader :id
+ def initialize(id, &block)
+ @id = id
+ @block = block
+ end
+
+ def run
+ @block.call if @block
+ end
+ end
+
+ def setup
+ @queue = ActiveSupport::Queue.new
+ @logger = ActiveSupport::LogSubscriber::TestHelper::MockLogger.new
+ @consumer = ActiveSupport::ThreadedQueueConsumer.start(@queue, @logger)
+ end
+
+ def teardown
+ @queue.push nil
+ end
+
+ test "the jobs are executed" do
+ ran = false
+
+ job = Job.new(1) do
+ ran = true
+ end
+
+ @queue.push job
+ sleep 0.1
+ assert_equal true, ran
+ end
+
+ test "the jobs are not executed synchronously" do
+ ran = false
+
+ job = Job.new(1) do
+ sleep 0.1
+ ran = true
+ end
+
+ @queue.push job
+ assert_equal false, ran
+ end
+
+ test "shutting down the queue synchronously drains the jobs" do
+ ran = false
+
+ job = Job.new(1) do
+ sleep 0.1
+ ran = true
+ end
+
+ @queue.push job
+ assert_equal false, ran
+
+ @consumer.shutdown
+
+ assert_equal true, ran
+ end
+
+ test "log job that raises an exception" do
+ job = Job.new(1) do
+ raise "RuntimeError: Error!"
+ end
+
+ @queue.push job
+ sleep 0.1
+
+ assert_equal 1, @logger.logged(:error).size
+ assert_match(/Job Error: RuntimeError: Error!/, @logger.logged(:error).last)
+ end
+
+ test "test overriding exception handling" do
+ @consumer.shutdown
+ @consumer = Class.new(ActiveSupport::ThreadedQueueConsumer) do
+ attr_reader :last_error
+ def handle_exception(e)
+ @last_error = e.message
+ end
+ end.start(@queue)
+
+ job = Job.new(1) do
+ raise "RuntimeError: Error!"
+ end
+
+ @queue.push job
+ sleep 0.1
+
+ assert_equal "RuntimeError: Error!", @consumer.last_error
+ end
+end