path: root/activerecord/test/cases/connection_pool_test.rb
diff options
Diffstat (limited to 'activerecord/test/cases/connection_pool_test.rb')
1 files changed, 627 insertions, 0 deletions
diff --git a/activerecord/test/cases/connection_pool_test.rb b/activerecord/test/cases/connection_pool_test.rb
new file mode 100644
index 0000000000..bfaaa3c54e
--- /dev/null
+++ b/activerecord/test/cases/connection_pool_test.rb
@@ -0,0 +1,627 @@
+# frozen_string_literal: true
+require "cases/helper"
+require "concurrent/atomic/count_down_latch"
+module ActiveRecord
+ module ConnectionAdapters
+ class ConnectionPoolTest < ActiveRecord::TestCase
+ attr_reader :pool
+ def setup
+ super
+ # Keep a duplicate pool so we do not bother others
+ @pool = ConnectionPool.new ActiveRecord::Base.connection_pool.spec
+ if in_memory_db?
+ # Separate connections to an in-memory database create an entirely new database,
+ # with an empty schema etc, so we just stub out this schema on the fly.
+ @pool.with_connection do |connection|
+ connection.create_table :posts do |t|
+ t.integer :cololumn
+ end
+ end
+ end
+ end
+ teardown do
+ @pool.disconnect!
+ end
+ def active_connections(pool)
+ pool.connections.find_all(&:in_use?)
+ end
+ def test_checkout_after_close
+ connection = pool.connection
+ assert_predicate connection, :in_use?
+ connection.close
+ assert_not_predicate connection, :in_use?
+ assert_predicate pool.connection, :in_use?
+ end
+ def test_released_connection_moves_between_threads
+ thread_conn = nil
+ Thread.new {
+ pool.with_connection do |conn|
+ thread_conn = conn
+ end
+ }.join
+ assert thread_conn
+ Thread.new {
+ pool.with_connection do |conn|
+ assert_equal thread_conn, conn
+ end
+ }.join
+ end
+ def test_with_connection
+ assert_equal 0, active_connections(pool).size
+ main_thread = pool.connection
+ assert_equal 1, active_connections(pool).size
+ Thread.new {
+ pool.with_connection do |conn|
+ assert conn
+ assert_equal 2, active_connections(pool).size
+ end
+ assert_equal 1, active_connections(pool).size
+ }.join
+ main_thread.close
+ assert_equal 0, active_connections(pool).size
+ end
+ def test_active_connection_in_use
+ assert_not_predicate pool, :active_connection?
+ main_thread = pool.connection
+ assert_predicate pool, :active_connection?
+ main_thread.close
+ assert_not_predicate pool, :active_connection?
+ end
+ def test_full_pool_exception
+ @pool.size.times { assert @pool.checkout }
+ assert_raises(ConnectionTimeoutError) do
+ @pool.checkout
+ end
+ end
+ def test_full_pool_blocks
+ cs = @pool.size.times.map { @pool.checkout }
+ t = Thread.new { @pool.checkout }
+ # make sure our thread is in the timeout section
+ Thread.pass until @pool.num_waiting_in_queue == 1
+ connection = cs.first
+ connection.close
+ assert_equal connection, t.join.value
+ end
+ def test_removing_releases_latch
+ cs = @pool.size.times.map { @pool.checkout }
+ t = Thread.new { @pool.checkout }
+ # make sure our thread is in the timeout section
+ Thread.pass until @pool.num_waiting_in_queue == 1
+ connection = cs.first
+ @pool.remove connection
+ assert_respond_to t.join.value, :execute
+ connection.close
+ end
+ def test_reap_and_active
+ @pool.checkout
+ @pool.checkout
+ @pool.checkout
+ connections = @pool.connections.dup
+ @pool.reap
+ assert_equal connections.length, @pool.connections.length
+ end
+ def test_reap_inactive
+ ready = Concurrent::CountDownLatch.new
+ @pool.checkout
+ child = Thread.new do
+ @pool.checkout
+ @pool.checkout
+ ready.count_down
+ Thread.stop
+ end
+ ready.wait
+ assert_equal 3, active_connections(@pool).size
+ child.terminate
+ child.join
+ @pool.reap
+ assert_equal 1, active_connections(@pool).size
+ ensure
+ @pool.connections.each { |conn| conn.close if conn.in_use? }
+ end
+ def test_flush
+ idle_conn = @pool.checkout
+ recent_conn = @pool.checkout
+ active_conn = @pool.checkout
+ @pool.checkin idle_conn
+ @pool.checkin recent_conn
+ assert_equal 3, @pool.connections.length
+ def idle_conn.seconds_idle
+ 1000
+ end
+ @pool.flush(30)
+ assert_equal 2, @pool.connections.length
+ assert_equal [recent_conn, active_conn].sort_by(&:__id__), @pool.connections.sort_by(&:__id__)
+ ensure
+ @pool.checkin active_conn
+ end
+ def test_flush_bang
+ idle_conn = @pool.checkout
+ recent_conn = @pool.checkout
+ active_conn = @pool.checkout
+ _dead_conn = Thread.new { @pool.checkout }.join
+ @pool.checkin idle_conn
+ @pool.checkin recent_conn
+ assert_equal 4, @pool.connections.length
+ def idle_conn.seconds_idle
+ 1000
+ end
+ @pool.flush!
+ assert_equal 1, @pool.connections.length
+ assert_equal [active_conn].sort_by(&:__id__), @pool.connections.sort_by(&:__id__)
+ ensure
+ @pool.checkin active_conn
+ end
+ def test_remove_connection
+ conn = @pool.checkout
+ assert_predicate conn, :in_use?
+ length = @pool.connections.length
+ @pool.remove conn
+ assert_predicate conn, :in_use?
+ assert_equal(length - 1, @pool.connections.length)
+ ensure
+ conn.close
+ end
+ def test_remove_connection_for_thread
+ conn = @pool.connection
+ @pool.remove conn
+ assert_not_equal(conn, @pool.connection)
+ ensure
+ conn.close if conn
+ end
+ def test_active_connection?
+ assert_not_predicate @pool, :active_connection?
+ assert @pool.connection
+ assert_predicate @pool, :active_connection?
+ @pool.release_connection
+ assert_not_predicate @pool, :active_connection?
+ end
+ def test_checkout_behaviour
+ pool = ConnectionPool.new ActiveRecord::Base.connection_pool.spec
+ main_connection = pool.connection
+ assert_not_nil main_connection
+ threads = []
+ 4.times do |i|
+ threads << Thread.new(i) do
+ thread_connection = pool.connection
+ assert_not_nil thread_connection
+ thread_connection.close
+ end
+ end
+ threads.each(&:join)
+ Thread.new do
+ assert pool.connection
+ pool.connection.close
+ end.join
+ end
+ def test_checkout_order_is_lifo
+ conn1 = @pool.checkout
+ conn2 = @pool.checkout
+ @pool.checkin conn1
+ @pool.checkin conn2
+ assert_equal [conn2, conn1], 2.times.map { @pool.checkout }
+ end
+ # The connection pool is "fair" if threads waiting for
+ # connections receive them in the order in which they began
+ # waiting. This ensures that we don't timeout one HTTP request
+ # even while well under capacity in a multi-threaded environment
+ # such as a Java servlet container.
+ #
+ # We don't need strict fairness: if two connections become
+ # available at the same time, it's fine if two threads that were
+ # waiting acquire the connections out of order.
+ #
+ # Thus this test prepares waiting threads and then trickles in
+ # available connections slowly, ensuring the wakeup order is
+ # correct in this case.
+ def test_checkout_fairness
+ @pool.instance_variable_set(:@size, 10)
+ expected = (1..@pool.size).to_a.freeze
+ # check out all connections so our threads start out waiting
+ conns = expected.map { @pool.checkout }
+ mutex = Mutex.new
+ order = []
+ errors = []
+ threads = expected.map do |i|
+ t = Thread.new {
+ begin
+ @pool.checkout # never checked back in
+ mutex.synchronize { order << i }
+ rescue => e
+ mutex.synchronize { errors << e }
+ end
+ }
+ Thread.pass until @pool.num_waiting_in_queue == i
+ t
+ end
+ # this should wake up the waiting threads one by one in order
+ conns.each { |conn| @pool.checkin(conn); sleep 0.1 }
+ threads.each(&:join)
+ raise errors.first if errors.any?
+ assert_equal(expected, order)
+ end
+ # As mentioned in #test_checkout_fairness, we don't care about
+ # strict fairness. This test creates two groups of threads:
+ # group1 whose members all start waiting before any thread in
+ # group2. Enough connections are checked in to wakeup all
+ # group1 threads, and the fact that only group1 and no group2
+ # threads acquired a connection is enforced.
+ def test_checkout_fairness_by_group
+ @pool.instance_variable_set(:@size, 10)
+ # take all the connections
+ conns = (1..10).map { @pool.checkout }
+ mutex = Mutex.new
+ successes = [] # threads that successfully got a connection
+ errors = []
+ make_thread = proc do |i|
+ t = Thread.new {
+ begin
+ @pool.checkout # never checked back in
+ mutex.synchronize { successes << i }
+ rescue => e
+ mutex.synchronize { errors << e }
+ end
+ }
+ Thread.pass until @pool.num_waiting_in_queue == i
+ t
+ end
+ # all group1 threads start waiting before any in group2
+ group1 = (1..5).map(&make_thread)
+ group2 = (6..10).map(&make_thread)
+ # checkin n connections back to the pool
+ checkin = proc do |n|
+ n.times do
+ c = conns.pop
+ @pool.checkin(c)
+ end
+ end
+ checkin.call(group1.size) # should wake up all group1
+ loop do
+ sleep 0.1
+ break if mutex.synchronize { (successes.size + errors.size) == group1.size }
+ end
+ winners = mutex.synchronize { successes.dup }
+ checkin.call(group2.size) # should wake up everyone remaining
+ group1.each(&:join)
+ group2.each(&:join)
+ assert_equal((1..group1.size).to_a, winners.sort)
+ if errors.any?
+ raise errors.first
+ end
+ end
+ def test_automatic_reconnect_restores_after_disconnect
+ pool = ConnectionPool.new ActiveRecord::Base.connection_pool.spec
+ assert pool.automatic_reconnect
+ assert pool.connection
+ pool.disconnect!
+ assert pool.connection
+ end
+ def test_automatic_reconnect_can_be_disabled
+ pool = ConnectionPool.new ActiveRecord::Base.connection_pool.spec
+ pool.disconnect!
+ pool.automatic_reconnect = false
+ assert_raises(ConnectionNotEstablished) do
+ pool.connection
+ end
+ assert_raises(ConnectionNotEstablished) do
+ pool.with_connection
+ end
+ end
+ def test_pool_sets_connection_visitor
+ assert @pool.connection.visitor.is_a?(Arel::Visitors::ToSql)
+ end
+ # make sure exceptions are thrown when establish_connection
+ # is called with an anonymous class
+ def test_anonymous_class_exception
+ anonymous = Class.new(ActiveRecord::Base)
+ assert_raises(RuntimeError) do
+ anonymous.establish_connection
+ end
+ end
+ class ConnectionTestModel < ActiveRecord::Base
+ end
+ def test_connection_notification_is_called
+ payloads = []
+ subscription = ActiveSupport::Notifications.subscribe("!connection.active_record") do |name, started, finished, unique_id, payload|
+ payloads << payload
+ end
+ ConnectionTestModel.establish_connection :arunit
+ assert_equal [:config, :connection_id, :spec_name], payloads[0].keys.sort
+ assert_equal "ActiveRecord::ConnectionAdapters::ConnectionPoolTest::ConnectionTestModel", payloads[0][:spec_name]
+ ensure
+ ActiveSupport::Notifications.unsubscribe(subscription) if subscription
+ end
+ def test_pool_sets_connection_schema_cache
+ connection = pool.checkout
+ schema_cache = SchemaCache.new connection
+ schema_cache.add(:posts)
+ pool.schema_cache = schema_cache
+ pool.with_connection do |conn|
+ assert_not_same pool.schema_cache, conn.schema_cache
+ assert_equal pool.schema_cache.size, conn.schema_cache.size
+ assert_same pool.schema_cache.columns(:posts), conn.schema_cache.columns(:posts)
+ end
+ pool.checkin connection
+ end
+ def test_concurrent_connection_establishment
+ assert_operator @pool.connections.size, :<=, 1
+ all_threads_in_new_connection = Concurrent::CountDownLatch.new(@pool.size - @pool.connections.size)
+ all_go = Concurrent::CountDownLatch.new
+ @pool.singleton_class.class_eval do
+ define_method(:new_connection) do
+ all_threads_in_new_connection.count_down
+ all_go.wait
+ super()
+ end
+ end
+ connecting_threads = []
+ @pool.size.times do
+ connecting_threads << Thread.new { @pool.checkout }
+ end
+ begin
+ Timeout.timeout(5) do
+ # the kernel of the whole test is here, everything else is just scaffolding,
+ # this latch will not be released unless conn. pool allows for concurrent
+ # connection creation
+ all_threads_in_new_connection.wait
+ end
+ rescue Timeout::Error
+ flunk "pool unable to establish connections concurrently or implementation has " \
+ "changed, this test then needs to patch a different :new_connection method"
+ ensure
+ # clean up the threads
+ all_go.count_down
+ connecting_threads.map(&:join)
+ end
+ end
+ def test_non_bang_disconnect_and_clear_reloadable_connections_throw_exception_if_threads_dont_return_their_conns
+ Thread.report_on_exception, original_report_on_exception = false, Thread.report_on_exception if Thread.respond_to?(:report_on_exception)
+ @pool.checkout_timeout = 0.001 # no need to delay test suite by waiting the whole full default timeout
+ [:disconnect, :clear_reloadable_connections].each do |group_action_method|
+ @pool.with_connection do |connection|
+ assert_raises(ExclusiveConnectionTimeoutError) do
+ Thread.new { @pool.send(group_action_method) }.join
+ end
+ end
+ end
+ ensure
+ Thread.report_on_exception = original_report_on_exception if Thread.respond_to?(:report_on_exception)
+ end
+ def test_disconnect_and_clear_reloadable_connections_attempt_to_wait_for_threads_to_return_their_conns
+ [:disconnect, :disconnect!, :clear_reloadable_connections, :clear_reloadable_connections!].each do |group_action_method|
+ begin
+ thread = timed_join_result = nil
+ @pool.with_connection do |connection|
+ thread = Thread.new { @pool.send(group_action_method) }
+ # give the other `thread` some time to get stuck in `group_action_method`
+ timed_join_result = thread.join(0.3)
+ # thread.join # => `nil` means the other thread hasn't finished running and is still waiting for us to
+ # release our connection
+ assert_nil timed_join_result
+ # assert that since this is within default timeout our connection hasn't been forcefully taken away from us
+ assert_predicate @pool, :active_connection?
+ end
+ ensure
+ thread.join if thread && !timed_join_result # clean up the other thread
+ end
+ end
+ end
+ def test_bang_versions_of_disconnect_and_clear_reloadable_connections_if_unable_to_acquire_all_connections_proceed_anyway
+ @pool.checkout_timeout = 0.001 # no need to delay test suite by waiting the whole full default timeout
+ [:disconnect!, :clear_reloadable_connections!].each do |group_action_method|
+ @pool.with_connection do |connection|
+ Thread.new { @pool.send(group_action_method) }.join
+ # assert connection has been forcefully taken away from us
+ assert_not_predicate @pool, :active_connection?
+ # make a new connection for with_connection to clean up
+ @pool.connection
+ end
+ end
+ end
+ def test_disconnect_and_clear_reloadable_connections_are_able_to_preempt_other_waiting_threads
+ with_single_connection_pool do |pool|
+ [:disconnect, :disconnect!, :clear_reloadable_connections, :clear_reloadable_connections!].each do |group_action_method|
+ conn = pool.connection # drain the only available connection
+ second_thread_done = Concurrent::Event.new
+ begin
+ # create a first_thread and let it get into the FIFO queue first
+ first_thread = Thread.new do
+ pool.with_connection { second_thread_done.wait }
+ end
+ # wait for first_thread to get in queue
+ Thread.pass until pool.num_waiting_in_queue == 1
+ # create a different, later thread, that will attempt to do a "group action",
+ # but because of the group action semantics it should be able to preempt the
+ # first_thread when a connection is made available
+ second_thread = Thread.new do
+ pool.send(group_action_method)
+ second_thread_done.set
+ end
+ # wait for second_thread to get in queue
+ Thread.pass until pool.num_waiting_in_queue == 2
+ # return the only available connection
+ pool.checkin(conn)
+ # if the second_thread is not able to preempt the first_thread,
+ # they will temporarily (until either of them timeouts with ConnectionTimeoutError)
+ # deadlock and a join(2) timeout will be reached
+ assert second_thread.join(2), "#{group_action_method} is not able to preempt other waiting threads"
+ ensure
+ # post test clean up
+ failed = !second_thread_done.set?
+ if failed
+ second_thread_done.set
+ first_thread.join(2)
+ second_thread.join(2)
+ end
+ first_thread.join(10) || raise("first_thread got stuck")
+ second_thread.join(10) || raise("second_thread got stuck")
+ end
+ end
+ end
+ end
+ def test_clear_reloadable_connections_creates_new_connections_for_waiting_threads_if_necessary
+ with_single_connection_pool do |pool|
+ conn = pool.connection # drain the only available connection
+ def conn.requires_reloading? # make sure it gets removed from the pool by clear_reloadable_connections
+ true
+ end
+ stuck_thread = Thread.new do
+ pool.with_connection {}
+ end
+ # wait for stuck_thread to get in queue
+ Thread.pass until pool.num_waiting_in_queue == 1
+ pool.clear_reloadable_connections
+ unless stuck_thread.join(2)
+ flunk "clear_reloadable_connections must not let other connection waiting threads get stuck in queue"
+ end
+ assert_equal 0, pool.num_waiting_in_queue
+ end
+ end
+ def test_connection_pool_stat
+ with_single_connection_pool do |pool|
+ pool.with_connection do |connection|
+ stats = pool.stat
+ assert_equal({ size: 1, connections: 1, busy: 1, dead: 0, idle: 0, waiting: 0, checkout_timeout: 5 }, stats)
+ end
+ stats = pool.stat
+ assert_equal({ size: 1, connections: 1, busy: 0, dead: 0, idle: 1, waiting: 0, checkout_timeout: 5 }, stats)
+ Thread.new do
+ pool.checkout
+ Thread.current.kill
+ end.join
+ stats = pool.stat
+ assert_equal({ size: 1, connections: 1, busy: 0, dead: 1, idle: 0, waiting: 0, checkout_timeout: 5 }, stats)
+ end
+ end
+ private
+ def with_single_connection_pool
+ one_conn_spec = ActiveRecord::Base.connection_pool.spec.dup
+ one_conn_spec.config[:pool] = 1 # this is safe to do, because .dupped ConnectionSpecification also auto-dups its config
+ yield(pool = ConnectionPool.new(one_conn_spec))
+ ensure
+ pool.disconnect! if pool
+ end
+ end
+ end