aboutsummaryrefslogtreecommitdiffstats
path: root/activesupport/test/queueing/threaded_consumer_test.rb
diff options
context:
space:
mode:
Diffstat (limited to 'activesupport/test/queueing/threaded_consumer_test.rb')
-rw-r--r--activesupport/test/queueing/threaded_consumer_test.rb92
1 files changed, 92 insertions, 0 deletions
diff --git a/activesupport/test/queueing/threaded_consumer_test.rb b/activesupport/test/queueing/threaded_consumer_test.rb
new file mode 100644
index 0000000000..fc43cb555a
--- /dev/null
+++ b/activesupport/test/queueing/threaded_consumer_test.rb
@@ -0,0 +1,92 @@
+require 'abstract_unit'
+require 'active_support/queueing'
+require "active_support/log_subscriber/test_helper"
+
+class TestThreadConsumer < ActiveSupport::TestCase
+ class Job
+ attr_reader :id
+ def initialize(id = 1, &block)
+ @id = id
+ @block = block
+ end
+
+ def run
+ @block.call if @block
+ end
+ end
+
+ def setup
+ @logger = ActiveSupport::LogSubscriber::TestHelper::MockLogger.new
+ @queue = ActiveSupport::Queue.new(logger: @logger)
+ end
+
+ def teardown
+ @queue.drain
+ end
+
+ test "the jobs are executed" do
+ ran = false
+ job = Job.new { ran = true }
+
+ @queue.push job
+ @queue.drain
+
+ assert_equal true, ran
+ end
+
+ test "the jobs are not executed synchronously" do
+ run, ran = Queue.new, Queue.new
+ job = Job.new { ran.push run.pop }
+
+ @queue.consumer.start
+ @queue.push job
+ assert ran.empty?
+
+ run.push true
+ assert_equal true, ran.pop
+ end
+
+ test "shutting down the queue synchronously drains the jobs" do
+ ran = false
+ job = Job.new do
+ sleep 0.1
+ ran = true
+ end
+
+ @queue.consumer.start
+ @queue.push job
+ assert_equal false, ran
+
+ @queue.consumer.shutdown
+ assert_equal true, ran
+ end
+
+ test "log job that raises an exception" do
+ job = Job.new { raise "RuntimeError: Error!" }
+
+ @queue.push job
+ @queue.drain
+
+ assert_equal 1, @logger.logged(:error).size
+ assert_match 'Job Error: RuntimeError: Error!', @logger.logged(:error).last
+ end
+
+ test "test overriding exception handling" do
+ @queue.consumer.instance_eval do
+ def handle_exception(job, exception)
+ @last_error = exception.message
+ end
+
+ def last_error
+ @last_error
+ end
+ end
+
+ job = Job.new { raise "RuntimeError: Error!" }
+
+ @queue.push job
+ @queue.drain
+
+ assert_equal "RuntimeError: Error!", @queue.consumer.last_error
+ end
+end