aboutsummaryrefslogtreecommitdiffstats
path: root/activesupport/test/queueing
diff options
context:
space:
mode:
Diffstat (limited to 'activesupport/test/queueing')
-rw-r--r--activesupport/test/queueing/synchronous_queue_test.rb27
-rw-r--r--activesupport/test/queueing/test_queue_test.rb146
-rw-r--r--activesupport/test/queueing/threaded_consumer_test.rb110
3 files changed, 0 insertions, 283 deletions
diff --git a/activesupport/test/queueing/synchronous_queue_test.rb b/activesupport/test/queueing/synchronous_queue_test.rb
deleted file mode 100644
index 86c39d0f6c..0000000000
--- a/activesupport/test/queueing/synchronous_queue_test.rb
+++ /dev/null
@@ -1,27 +0,0 @@
-require 'abstract_unit'
-require 'active_support/queueing'
-
-class SynchronousQueueTest < ActiveSupport::TestCase
- class Job
- attr_reader :ran
- def run; @ran = true end
- end
-
- class ExceptionRaisingJob
- def run; raise end
- end
-
- def setup
- @queue = ActiveSupport::SynchronousQueue.new
- end
-
- def test_runs_jobs_immediately
- job = Job.new
- @queue.push job
- assert job.ran
-
- assert_raises RuntimeError do
- @queue.push ExceptionRaisingJob.new
- end
- end
-end
diff --git a/activesupport/test/queueing/test_queue_test.rb b/activesupport/test/queueing/test_queue_test.rb
deleted file mode 100644
index 451fb68d3e..0000000000
--- a/activesupport/test/queueing/test_queue_test.rb
+++ /dev/null
@@ -1,146 +0,0 @@
-require 'abstract_unit'
-require 'active_support/queueing'
-
-class TestQueueTest < ActiveSupport::TestCase
- def setup
- @queue = ActiveSupport::TestQueue.new
- end
-
- class ExceptionRaisingJob
- def run
- raise
- end
- end
-
- def test_drain_raises_exceptions_from_running_jobs
- @queue.push ExceptionRaisingJob.new
- assert_raises(RuntimeError) { @queue.drain }
- end
-
- def test_jobs
- @queue.push 1
- @queue.push 2
- assert_equal [1,2], @queue.jobs
- end
-
- class EquivalentJob
- def initialize
- @initial_id = self.object_id
- end
-
- def run
- end
-
- def ==(other)
- other.same_initial_id?(@initial_id)
- end
-
- def same_initial_id?(other_id)
- other_id == @initial_id
- end
- end
-
- def test_contents
- job = EquivalentJob.new
- assert @queue.empty?
- @queue.push job
- refute @queue.empty?
- assert_equal job, @queue.pop
- end
-
- class ProcessingJob
- def self.clear_processed
- @processed = []
- end
-
- def self.processed
- @processed
- end
-
- def initialize(object)
- @object = object
- end
-
- def run
- self.class.processed << @object
- end
- end
-
- def test_order
- ProcessingJob.clear_processed
- job1 = ProcessingJob.new(1)
- job2 = ProcessingJob.new(2)
-
- @queue.push job1
- @queue.push job2
- @queue.drain
-
- assert_equal [1,2], ProcessingJob.processed
- end
-
- class ThreadTrackingJob
- attr_reader :thread_id
-
- def run
- @thread_id = Thread.current.object_id
- end
-
- def ran?
- @thread_id
- end
- end
-
- def test_drain
- @queue.push ThreadTrackingJob.new
- job = @queue.jobs.last
- @queue.drain
-
- assert @queue.empty?
- assert job.ran?, "The job runs synchronously when the queue is drained"
- assert_equal job.thread_id, Thread.current.object_id
- end
-
- class IdentifiableJob
- def initialize(id)
- @id = id
- end
-
- def ==(other)
- other.same_id?(@id)
- end
-
- def same_id?(other_id)
- other_id == @id
- end
-
- def run
- end
- end
-
- def test_queue_can_be_observed
- jobs = (1..10).map do |id|
- IdentifiableJob.new(id)
- end
-
- jobs.each do |job|
- @queue.push job
- end
-
- assert_equal jobs, @queue.jobs
- end
-
- def test_adding_an_unmarshallable_job
- anonymous_class_instance = Struct.new(:run).new
-
- assert_raises TypeError do
- @queue.push anonymous_class_instance
- end
- end
-
- def test_attempting_to_add_a_reference_to_itself
- job = {reference: @queue}
- assert_raises TypeError do
- @queue.push job
- end
- end
-end
diff --git a/activesupport/test/queueing/threaded_consumer_test.rb b/activesupport/test/queueing/threaded_consumer_test.rb
deleted file mode 100644
index a3ca46a261..0000000000
--- a/activesupport/test/queueing/threaded_consumer_test.rb
+++ /dev/null
@@ -1,110 +0,0 @@
-require 'abstract_unit'
-require 'active_support/queueing'
-require "active_support/log_subscriber/test_helper"
-
-class TestThreadConsumer < ActiveSupport::TestCase
- class Job
- attr_reader :id
- def initialize(id = 1, &block)
- @id = id
- @block = block
- end
-
- def run
- @block.call if @block
- end
- end
-
- def setup
- @logger = ActiveSupport::LogSubscriber::TestHelper::MockLogger.new
- @queue = ActiveSupport::Queue.new(logger: @logger)
- end
-
- def teardown
- @queue.drain
- end
-
- test "the jobs are executed" do
- ran = false
- job = Job.new { ran = true }
-
- @queue.push job
- @queue.drain
-
- assert_equal true, ran
- end
-
- test "the jobs are not executed synchronously" do
- run, ran = Queue.new, Queue.new
- job = Job.new { ran.push run.pop }
-
- @queue.consumer.start
- @queue.push job
- assert ran.empty?
-
- run.push true
- assert_equal true, ran.pop
- end
-
- test "shutting down the queue synchronously drains the jobs" do
- ran = false
- job = Job.new do
- sleep 0.1
- ran = true
- end
-
- @queue.consumer.start
- @queue.push job
- assert_equal false, ran
-
- @queue.consumer.shutdown
- assert_equal true, ran
- end
-
- test "log job that raises an exception" do
- job = Job.new { raise "RuntimeError: Error!" }
-
- @queue.push job
- consume_queue @queue
-
- assert_equal 1, @logger.logged(:error).size
- assert_match "Job Error: #{job.inspect}\nRuntimeError: Error!", @logger.logged(:error).last
- end
-
- test "logger defaults to stderr" do
- begin
- $stderr, old_stderr = StringIO.new, $stderr
- queue = ActiveSupport::Queue.new
- queue.push Job.new { raise "RuntimeError: Error!" }
- consume_queue queue
- assert_match 'Job Error', $stderr.string
- ensure
- $stderr = old_stderr
- end
- end
-
- test "test overriding exception handling" do
- @queue.consumer.instance_eval do
- def handle_exception(job, exception)
- @last_error = exception.message
- end
-
- def last_error
- @last_error
- end
- end
-
- job = Job.new { raise "RuntimeError: Error!" }
-
- @queue.push job
- consume_queue @queue
-
- assert_equal "RuntimeError: Error!", @queue.consumer.last_error
- end
-
- private
- def consume_queue(queue)
- queue.push nil
- queue.consumer.consume
- end
-end