diff options
Diffstat (limited to 'activerecord/test/cases/connection_pool_test.rb')
-rw-r--r-- | activerecord/test/cases/connection_pool_test.rb | 627 |
1 files changed, 627 insertions, 0 deletions
diff --git a/activerecord/test/cases/connection_pool_test.rb b/activerecord/test/cases/connection_pool_test.rb new file mode 100644 index 0000000000..bfaaa3c54e --- /dev/null +++ b/activerecord/test/cases/connection_pool_test.rb @@ -0,0 +1,627 @@ +# frozen_string_literal: true + +require "cases/helper" +require "concurrent/atomic/count_down_latch" + +module ActiveRecord + module ConnectionAdapters + class ConnectionPoolTest < ActiveRecord::TestCase + attr_reader :pool + + def setup + super + + # Keep a duplicate pool so we do not bother others + @pool = ConnectionPool.new ActiveRecord::Base.connection_pool.spec + + if in_memory_db? + # Separate connections to an in-memory database create an entirely new database, + # with an empty schema etc, so we just stub out this schema on the fly. + @pool.with_connection do |connection| + connection.create_table :posts do |t| + t.integer :cololumn + end + end + end + end + + teardown do + @pool.disconnect! + end + + def active_connections(pool) + pool.connections.find_all(&:in_use?) + end + + def test_checkout_after_close + connection = pool.connection + assert_predicate connection, :in_use? + + connection.close + assert_not_predicate connection, :in_use? + + assert_predicate pool.connection, :in_use? + end + + def test_released_connection_moves_between_threads + thread_conn = nil + + Thread.new { + pool.with_connection do |conn| + thread_conn = conn + end + }.join + + assert thread_conn + + Thread.new { + pool.with_connection do |conn| + assert_equal thread_conn, conn + end + }.join + end + + def test_with_connection + assert_equal 0, active_connections(pool).size + + main_thread = pool.connection + assert_equal 1, active_connections(pool).size + + Thread.new { + pool.with_connection do |conn| + assert conn + assert_equal 2, active_connections(pool).size + end + assert_equal 1, active_connections(pool).size + }.join + + main_thread.close + assert_equal 0, active_connections(pool).size + end + + def test_active_connection_in_use + assert_not_predicate pool, :active_connection? + main_thread = pool.connection + + assert_predicate pool, :active_connection? + + main_thread.close + + assert_not_predicate pool, :active_connection? + end + + def test_full_pool_exception + @pool.size.times { assert @pool.checkout } + assert_raises(ConnectionTimeoutError) do + @pool.checkout + end + end + + def test_full_pool_blocks + cs = @pool.size.times.map { @pool.checkout } + t = Thread.new { @pool.checkout } + + # make sure our thread is in the timeout section + Thread.pass until @pool.num_waiting_in_queue == 1 + + connection = cs.first + connection.close + assert_equal connection, t.join.value + end + + def test_removing_releases_latch + cs = @pool.size.times.map { @pool.checkout } + t = Thread.new { @pool.checkout } + + # make sure our thread is in the timeout section + Thread.pass until @pool.num_waiting_in_queue == 1 + + connection = cs.first + @pool.remove connection + assert_respond_to t.join.value, :execute + connection.close + end + + def test_reap_and_active + @pool.checkout + @pool.checkout + @pool.checkout + + connections = @pool.connections.dup + + @pool.reap + + assert_equal connections.length, @pool.connections.length + end + + def test_reap_inactive + ready = Concurrent::CountDownLatch.new + @pool.checkout + child = Thread.new do + @pool.checkout + @pool.checkout + ready.count_down + Thread.stop + end + ready.wait + + assert_equal 3, active_connections(@pool).size + + child.terminate + child.join + @pool.reap + + assert_equal 1, active_connections(@pool).size + ensure + @pool.connections.each { |conn| conn.close if conn.in_use? } + end + + def test_flush + idle_conn = @pool.checkout + recent_conn = @pool.checkout + active_conn = @pool.checkout + + @pool.checkin idle_conn + @pool.checkin recent_conn + + assert_equal 3, @pool.connections.length + + def idle_conn.seconds_idle + 1000 + end + + @pool.flush(30) + + assert_equal 2, @pool.connections.length + + assert_equal [recent_conn, active_conn].sort_by(&:__id__), @pool.connections.sort_by(&:__id__) + ensure + @pool.checkin active_conn + end + + def test_flush_bang + idle_conn = @pool.checkout + recent_conn = @pool.checkout + active_conn = @pool.checkout + _dead_conn = Thread.new { @pool.checkout }.join + + @pool.checkin idle_conn + @pool.checkin recent_conn + + assert_equal 4, @pool.connections.length + + def idle_conn.seconds_idle + 1000 + end + + @pool.flush! + + assert_equal 1, @pool.connections.length + + assert_equal [active_conn].sort_by(&:__id__), @pool.connections.sort_by(&:__id__) + ensure + @pool.checkin active_conn + end + + def test_remove_connection + conn = @pool.checkout + assert_predicate conn, :in_use? + + length = @pool.connections.length + @pool.remove conn + assert_predicate conn, :in_use? + assert_equal(length - 1, @pool.connections.length) + ensure + conn.close + end + + def test_remove_connection_for_thread + conn = @pool.connection + @pool.remove conn + assert_not_equal(conn, @pool.connection) + ensure + conn.close if conn + end + + def test_active_connection? + assert_not_predicate @pool, :active_connection? + assert @pool.connection + assert_predicate @pool, :active_connection? + @pool.release_connection + assert_not_predicate @pool, :active_connection? + end + + def test_checkout_behaviour + pool = ConnectionPool.new ActiveRecord::Base.connection_pool.spec + main_connection = pool.connection + assert_not_nil main_connection + threads = [] + 4.times do |i| + threads << Thread.new(i) do + thread_connection = pool.connection + assert_not_nil thread_connection + thread_connection.close + end + end + + threads.each(&:join) + + Thread.new do + assert pool.connection + pool.connection.close + end.join + end + + def test_checkout_order_is_lifo + conn1 = @pool.checkout + conn2 = @pool.checkout + @pool.checkin conn1 + @pool.checkin conn2 + assert_equal [conn2, conn1], 2.times.map { @pool.checkout } + end + + # The connection pool is "fair" if threads waiting for + # connections receive them in the order in which they began + # waiting. This ensures that we don't timeout one HTTP request + # even while well under capacity in a multi-threaded environment + # such as a Java servlet container. + # + # We don't need strict fairness: if two connections become + # available at the same time, it's fine if two threads that were + # waiting acquire the connections out of order. + # + # Thus this test prepares waiting threads and then trickles in + # available connections slowly, ensuring the wakeup order is + # correct in this case. + def test_checkout_fairness + @pool.instance_variable_set(:@size, 10) + expected = (1..@pool.size).to_a.freeze + # check out all connections so our threads start out waiting + conns = expected.map { @pool.checkout } + mutex = Mutex.new + order = [] + errors = [] + + threads = expected.map do |i| + t = Thread.new { + begin + @pool.checkout # never checked back in + mutex.synchronize { order << i } + rescue => e + mutex.synchronize { errors << e } + end + } + Thread.pass until @pool.num_waiting_in_queue == i + t + end + + # this should wake up the waiting threads one by one in order + conns.each { |conn| @pool.checkin(conn); sleep 0.1 } + + threads.each(&:join) + + raise errors.first if errors.any? + + assert_equal(expected, order) + end + + # As mentioned in #test_checkout_fairness, we don't care about + # strict fairness. This test creates two groups of threads: + # group1 whose members all start waiting before any thread in + # group2. Enough connections are checked in to wakeup all + # group1 threads, and the fact that only group1 and no group2 + # threads acquired a connection is enforced. + def test_checkout_fairness_by_group + @pool.instance_variable_set(:@size, 10) + # take all the connections + conns = (1..10).map { @pool.checkout } + mutex = Mutex.new + successes = [] # threads that successfully got a connection + errors = [] + + make_thread = proc do |i| + t = Thread.new { + begin + @pool.checkout # never checked back in + mutex.synchronize { successes << i } + rescue => e + mutex.synchronize { errors << e } + end + } + Thread.pass until @pool.num_waiting_in_queue == i + t + end + + # all group1 threads start waiting before any in group2 + group1 = (1..5).map(&make_thread) + group2 = (6..10).map(&make_thread) + + # checkin n connections back to the pool + checkin = proc do |n| + n.times do + c = conns.pop + @pool.checkin(c) + end + end + + checkin.call(group1.size) # should wake up all group1 + + loop do + sleep 0.1 + break if mutex.synchronize { (successes.size + errors.size) == group1.size } + end + + winners = mutex.synchronize { successes.dup } + checkin.call(group2.size) # should wake up everyone remaining + + group1.each(&:join) + group2.each(&:join) + + assert_equal((1..group1.size).to_a, winners.sort) + + if errors.any? + raise errors.first + end + end + + def test_automatic_reconnect_restores_after_disconnect + pool = ConnectionPool.new ActiveRecord::Base.connection_pool.spec + assert pool.automatic_reconnect + assert pool.connection + + pool.disconnect! + assert pool.connection + end + + def test_automatic_reconnect_can_be_disabled + pool = ConnectionPool.new ActiveRecord::Base.connection_pool.spec + pool.disconnect! + pool.automatic_reconnect = false + + assert_raises(ConnectionNotEstablished) do + pool.connection + end + + assert_raises(ConnectionNotEstablished) do + pool.with_connection + end + end + + def test_pool_sets_connection_visitor + assert @pool.connection.visitor.is_a?(Arel::Visitors::ToSql) + end + + # make sure exceptions are thrown when establish_connection + # is called with an anonymous class + def test_anonymous_class_exception + anonymous = Class.new(ActiveRecord::Base) + + assert_raises(RuntimeError) do + anonymous.establish_connection + end + end + + class ConnectionTestModel < ActiveRecord::Base + end + + def test_connection_notification_is_called + payloads = [] + subscription = ActiveSupport::Notifications.subscribe("!connection.active_record") do |name, started, finished, unique_id, payload| + payloads << payload + end + ConnectionTestModel.establish_connection :arunit + + assert_equal [:config, :connection_id, :spec_name], payloads[0].keys.sort + assert_equal "ActiveRecord::ConnectionAdapters::ConnectionPoolTest::ConnectionTestModel", payloads[0][:spec_name] + ensure + ActiveSupport::Notifications.unsubscribe(subscription) if subscription + end + + def test_pool_sets_connection_schema_cache + connection = pool.checkout + schema_cache = SchemaCache.new connection + schema_cache.add(:posts) + pool.schema_cache = schema_cache + + pool.with_connection do |conn| + assert_not_same pool.schema_cache, conn.schema_cache + assert_equal pool.schema_cache.size, conn.schema_cache.size + assert_same pool.schema_cache.columns(:posts), conn.schema_cache.columns(:posts) + end + + pool.checkin connection + end + + def test_concurrent_connection_establishment + assert_operator @pool.connections.size, :<=, 1 + + all_threads_in_new_connection = Concurrent::CountDownLatch.new(@pool.size - @pool.connections.size) + all_go = Concurrent::CountDownLatch.new + + @pool.singleton_class.class_eval do + define_method(:new_connection) do + all_threads_in_new_connection.count_down + all_go.wait + super() + end + end + + connecting_threads = [] + @pool.size.times do + connecting_threads << Thread.new { @pool.checkout } + end + + begin + Timeout.timeout(5) do + # the kernel of the whole test is here, everything else is just scaffolding, + # this latch will not be released unless conn. pool allows for concurrent + # connection creation + all_threads_in_new_connection.wait + end + rescue Timeout::Error + flunk "pool unable to establish connections concurrently or implementation has " \ + "changed, this test then needs to patch a different :new_connection method" + ensure + # clean up the threads + all_go.count_down + connecting_threads.map(&:join) + end + end + + def test_non_bang_disconnect_and_clear_reloadable_connections_throw_exception_if_threads_dont_return_their_conns + Thread.report_on_exception, original_report_on_exception = false, Thread.report_on_exception if Thread.respond_to?(:report_on_exception) + @pool.checkout_timeout = 0.001 # no need to delay test suite by waiting the whole full default timeout + [:disconnect, :clear_reloadable_connections].each do |group_action_method| + @pool.with_connection do |connection| + assert_raises(ExclusiveConnectionTimeoutError) do + Thread.new { @pool.send(group_action_method) }.join + end + end + end + ensure + Thread.report_on_exception = original_report_on_exception if Thread.respond_to?(:report_on_exception) + end + + def test_disconnect_and_clear_reloadable_connections_attempt_to_wait_for_threads_to_return_their_conns + [:disconnect, :disconnect!, :clear_reloadable_connections, :clear_reloadable_connections!].each do |group_action_method| + begin + thread = timed_join_result = nil + @pool.with_connection do |connection| + thread = Thread.new { @pool.send(group_action_method) } + + # give the other `thread` some time to get stuck in `group_action_method` + timed_join_result = thread.join(0.3) + # thread.join # => `nil` means the other thread hasn't finished running and is still waiting for us to + # release our connection + assert_nil timed_join_result + + # assert that since this is within default timeout our connection hasn't been forcefully taken away from us + assert_predicate @pool, :active_connection? + end + ensure + thread.join if thread && !timed_join_result # clean up the other thread + end + end + end + + def test_bang_versions_of_disconnect_and_clear_reloadable_connections_if_unable_to_acquire_all_connections_proceed_anyway + @pool.checkout_timeout = 0.001 # no need to delay test suite by waiting the whole full default timeout + [:disconnect!, :clear_reloadable_connections!].each do |group_action_method| + @pool.with_connection do |connection| + Thread.new { @pool.send(group_action_method) }.join + # assert connection has been forcefully taken away from us + assert_not_predicate @pool, :active_connection? + + # make a new connection for with_connection to clean up + @pool.connection + end + end + end + + def test_disconnect_and_clear_reloadable_connections_are_able_to_preempt_other_waiting_threads + with_single_connection_pool do |pool| + [:disconnect, :disconnect!, :clear_reloadable_connections, :clear_reloadable_connections!].each do |group_action_method| + conn = pool.connection # drain the only available connection + second_thread_done = Concurrent::Event.new + + begin + # create a first_thread and let it get into the FIFO queue first + first_thread = Thread.new do + pool.with_connection { second_thread_done.wait } + end + + # wait for first_thread to get in queue + Thread.pass until pool.num_waiting_in_queue == 1 + + # create a different, later thread, that will attempt to do a "group action", + # but because of the group action semantics it should be able to preempt the + # first_thread when a connection is made available + second_thread = Thread.new do + pool.send(group_action_method) + second_thread_done.set + end + + # wait for second_thread to get in queue + Thread.pass until pool.num_waiting_in_queue == 2 + + # return the only available connection + pool.checkin(conn) + + # if the second_thread is not able to preempt the first_thread, + # they will temporarily (until either of them timeouts with ConnectionTimeoutError) + # deadlock and a join(2) timeout will be reached + assert second_thread.join(2), "#{group_action_method} is not able to preempt other waiting threads" + + ensure + # post test clean up + failed = !second_thread_done.set? + + if failed + second_thread_done.set + + first_thread.join(2) + second_thread.join(2) + end + + first_thread.join(10) || raise("first_thread got stuck") + second_thread.join(10) || raise("second_thread got stuck") + end + end + end + end + + def test_clear_reloadable_connections_creates_new_connections_for_waiting_threads_if_necessary + with_single_connection_pool do |pool| + conn = pool.connection # drain the only available connection + def conn.requires_reloading? # make sure it gets removed from the pool by clear_reloadable_connections + true + end + + stuck_thread = Thread.new do + pool.with_connection {} + end + + # wait for stuck_thread to get in queue + Thread.pass until pool.num_waiting_in_queue == 1 + + pool.clear_reloadable_connections + + unless stuck_thread.join(2) + flunk "clear_reloadable_connections must not let other connection waiting threads get stuck in queue" + end + + assert_equal 0, pool.num_waiting_in_queue + end + end + + def test_connection_pool_stat + with_single_connection_pool do |pool| + pool.with_connection do |connection| + stats = pool.stat + assert_equal({ size: 1, connections: 1, busy: 1, dead: 0, idle: 0, waiting: 0, checkout_timeout: 5 }, stats) + end + + stats = pool.stat + assert_equal({ size: 1, connections: 1, busy: 0, dead: 0, idle: 1, waiting: 0, checkout_timeout: 5 }, stats) + + Thread.new do + pool.checkout + Thread.current.kill + end.join + + stats = pool.stat + assert_equal({ size: 1, connections: 1, busy: 0, dead: 1, idle: 0, waiting: 0, checkout_timeout: 5 }, stats) + end + end + + private + def with_single_connection_pool + one_conn_spec = ActiveRecord::Base.connection_pool.spec.dup + one_conn_spec.config[:pool] = 1 # this is safe to do, because .dupped ConnectionSpecification also auto-dups its config + yield(pool = ConnectionPool.new(one_conn_spec)) + ensure + pool.disconnect! if pool + end + end + end +end |