aboutsummaryrefslogtreecommitdiffstats
path: root/activesupport/test/queueing
diff options
context:
space:
mode:
Diffstat (limited to 'activesupport/test/queueing')
-rw-r--r--activesupport/test/queueing/container_test.rb28
-rw-r--r--activesupport/test/queueing/synchronous_queue_test.rb27
-rw-r--r--activesupport/test/queueing/test_queue_test.rb102
-rw-r--r--activesupport/test/queueing/threaded_consumer_test.rb92
4 files changed, 249 insertions, 0 deletions
diff --git a/activesupport/test/queueing/container_test.rb b/activesupport/test/queueing/container_test.rb
new file mode 100644
index 0000000000..7afc11e7a9
--- /dev/null
+++ b/activesupport/test/queueing/container_test.rb
@@ -0,0 +1,28 @@
+require 'abstract_unit'
+require 'active_support/queueing'
+
+module ActiveSupport
+ class ContainerTest < ActiveSupport::TestCase
+ def test_delegates_to_default
+ q = Queue.new
+ container = QueueContainer.new q
+ job = Object.new
+
+ container.push job
+ assert_equal job, q.pop
+ end
+
+ def test_access_default
+ q = Queue.new
+ container = QueueContainer.new q
+ assert_equal q, container[:default]
+ end
+
+ def test_assign_queue
+ container = QueueContainer.new Object.new
+ q = Object.new
+ container[:foo] = q
+ assert_equal q, container[:foo]
+ end
+ end
+end
diff --git a/activesupport/test/queueing/synchronous_queue_test.rb b/activesupport/test/queueing/synchronous_queue_test.rb
new file mode 100644
index 0000000000..86c39d0f6c
--- /dev/null
+++ b/activesupport/test/queueing/synchronous_queue_test.rb
@@ -0,0 +1,27 @@
+require 'abstract_unit'
+require 'active_support/queueing'
+
+class SynchronousQueueTest < ActiveSupport::TestCase
+ class Job
+ attr_reader :ran
+ def run; @ran = true end
+ end
+
+ class ExceptionRaisingJob
+ def run; raise end
+ end
+
+ def setup
+ @queue = ActiveSupport::SynchronousQueue.new
+ end
+
+ def test_runs_jobs_immediately
+ job = Job.new
+ @queue.push job
+ assert job.ran
+
+ assert_raises RuntimeError do
+ @queue.push ExceptionRaisingJob.new
+ end
+ end
+end
diff --git a/activesupport/test/queueing/test_queue_test.rb b/activesupport/test/queueing/test_queue_test.rb
new file mode 100644
index 0000000000..e398a48bea
--- /dev/null
+++ b/activesupport/test/queueing/test_queue_test.rb
@@ -0,0 +1,102 @@
+require 'abstract_unit'
+require 'active_support/queueing'
+
+class TestQueueTest < ActiveSupport::TestCase
+ def setup
+ @queue = ActiveSupport::TestQueue.new
+ end
+
+ class ExceptionRaisingJob
+ def run
+ raise
+ end
+ end
+
+ def test_drain_raises_exceptions_from_running_jobs
+ @queue.push ExceptionRaisingJob.new
+ assert_raises(RuntimeError) { @queue.drain }
+ end
+
+ def test_jobs
+ @queue.push 1
+ @queue.push 2
+ assert_equal [1,2], @queue.jobs
+ end
+
+ class EquivalentJob
+ def initialize
+ @initial_id = self.object_id
+ end
+
+ def run
+ end
+
+ def ==(other)
+ other.same_initial_id?(@initial_id)
+ end
+
+ def same_initial_id?(other_id)
+ other_id == @initial_id
+ end
+ end
+
+ def test_contents
+ job = EquivalentJob.new
+ assert @queue.empty?
+ @queue.push job
+ refute @queue.empty?
+ assert_equal job, @queue.pop
+ end
+
+ class ProcessingJob
+ def self.clear_processed
+ @processed = []
+ end
+
+ def self.processed
+ @processed
+ end
+
+ def initialize(object)
+ @object = object
+ end
+
+ def run
+ self.class.processed << @object
+ end
+ end
+
+ def test_order
+ ProcessingJob.clear_processed
+ job1 = ProcessingJob.new(1)
+ job2 = ProcessingJob.new(2)
+
+ @queue.push job1
+ @queue.push job2
+ @queue.drain
+
+ assert_equal [1,2], ProcessingJob.processed
+ end
+
+ class ThreadTrackingJob
+ attr_reader :thread_id
+
+ def run
+ @thread_id = Thread.current.object_id
+ end
+
+ def ran?
+ @thread_id
+ end
+ end
+
+ def test_drain
+ @queue.push ThreadTrackingJob.new
+ job = @queue.jobs.last
+ @queue.drain
+
+ assert @queue.empty?
+ assert job.ran?, "The job runs synchronously when the queue is drained"
+ assert_equal job.thread_id, Thread.current.object_id
+ end
+end
diff --git a/activesupport/test/queueing/threaded_consumer_test.rb b/activesupport/test/queueing/threaded_consumer_test.rb
new file mode 100644
index 0000000000..fc43cb555a
--- /dev/null
+++ b/activesupport/test/queueing/threaded_consumer_test.rb
@@ -0,0 +1,92 @@
+require 'abstract_unit'
+require 'active_support/queueing'
+require "active_support/log_subscriber/test_helper"
+
+class TestThreadConsumer < ActiveSupport::TestCase
+ class Job
+ attr_reader :id
+ def initialize(id = 1, &block)
+ @id = id
+ @block = block
+ end
+
+ def run
+ @block.call if @block
+ end
+ end
+
+ def setup
+ @logger = ActiveSupport::LogSubscriber::TestHelper::MockLogger.new
+ @queue = ActiveSupport::Queue.new(logger: @logger)
+ end
+
+ def teardown
+ @queue.drain
+ end
+
+ test "the jobs are executed" do
+ ran = false
+ job = Job.new { ran = true }
+
+ @queue.push job
+ @queue.drain
+
+ assert_equal true, ran
+ end
+
+ test "the jobs are not executed synchronously" do
+ run, ran = Queue.new, Queue.new
+ job = Job.new { ran.push run.pop }
+
+ @queue.consumer.start
+ @queue.push job
+ assert ran.empty?
+
+ run.push true
+ assert_equal true, ran.pop
+ end
+
+ test "shutting down the queue synchronously drains the jobs" do
+ ran = false
+ job = Job.new do
+ sleep 0.1
+ ran = true
+ end
+
+ @queue.consumer.start
+ @queue.push job
+ assert_equal false, ran
+
+ @queue.consumer.shutdown
+ assert_equal true, ran
+ end
+
+ test "log job that raises an exception" do
+ job = Job.new { raise "RuntimeError: Error!" }
+
+ @queue.push job
+ @queue.drain
+
+ assert_equal 1, @logger.logged(:error).size
+ assert_match 'Job Error: RuntimeError: Error!', @logger.logged(:error).last
+ end
+
+ test "test overriding exception handling" do
+ @queue.consumer.instance_eval do
+ def handle_exception(job, exception)
+ @last_error = exception.message
+ end
+
+ def last_error
+ @last_error
+ end
+ end
+
+ job = Job.new { raise "RuntimeError: Error!" }
+
+ @queue.push job
+ @queue.drain
+
+ assert_equal "RuntimeError: Error!", @queue.consumer.last_error
+ end
+end