aboutsummaryrefslogblamecommitdiffstats
path: root/activerecord/test/cases/connection_pool_test.rb
blob: 633d56e479833d42d890c141c1c9788b721e6325 (plain) (tree)
1
2
3
4
5
6
7
8
9
10

                             
                      
                                            



                                                     

                       
               

             

                                                                          









                                                                                          

         
                 
                         

         



                                            

                                    
                                             

                        
                                                 
 
                                                  

         

















                                                        


















                                                         
                                                      

                                     
                                                  


                         
                                                      

         
                                  
                                                                                                              
                                                  
 
                                                
                        


           




                                                        
                                                         





                                             





































                                                             




                                                        
                                                         



                                                
                        

         
                              


                      




                                           



                                                                 
                                              
                      


                             
                          
                     
           
                  

                                                      
 

                       

                  
                                                      
            
                                                                    

         







                                                      



                                          
 


                                                




                                          











                                                      



                                        




                                                









                                                



                                          

































                                                                                                      

                                
                                       


                                         
                                       




                                                          




                                                
                          

         
                                 
                                                       
                               
                                                   
                                
                                                       

         

                                                                         

                                         

                      
                                     


                                               

             
 
                            
 
                     
                                

                               
         
 
                                     






                                                                   
                                                            
                                                                 




                                                                      
                                                                      
















                                                                    
                                                    




                                               
                                                           





























                                                                      
                                                    




                                                  
                                                           


































                                                                                      
                                                            





                                                                         
         
 

                                                                         










                                                  



                                                                    
 
                                                                 
                                         






                                                 
 


                                                    

                                                
                                                                                                                                           

                             

                                                        
                                                                                 
                                                                                                                         



                                                                              













                                                                                          

                                                  

                                                      

                                                                                                           


                                           

                                                    













                                                                                        
                                              

                             
                                                                                            
                                                                                           

                                
                           


                                        

                                                                                                                      
                                                                                                    







                                                                                                              
            
                                                                 















                                                                                                                                
                                                         






                                                                                   
                                                                                                                               




                                                                                                              
                                                           


                                                                   







                                                                                                                                  
                                                      
 


































                                                                                                                 

                                     



                                                                        
               











                                                                                                                
                                    







                                                          
                                                                                                                 





                                                   



















                                                                                                                        
             






                                                                                                                                 


       
# frozen_string_literal: true

require "cases/helper"
require "concurrent/atomic/count_down_latch"

module ActiveRecord
  module ConnectionAdapters
    class ConnectionPoolTest < ActiveRecord::TestCase
      attr_reader :pool

      def setup
        super

        # Keep a duplicate pool so we do not bother others
        @pool = ConnectionPool.new ActiveRecord::Base.connection_pool.spec

        if in_memory_db?
          # Separate connections to an in-memory database create an entirely new database,
          # with an empty schema etc, so we just stub out this schema on the fly.
          @pool.with_connection do |connection|
            connection.create_table :posts do |t|
              t.integer :cololumn
            end
          end
        end
      end

      teardown do
        @pool.disconnect!
      end

      def active_connections(pool)
        pool.connections.find_all(&:in_use?)
      end

      def test_checkout_after_close
        connection = pool.connection
        assert_predicate connection, :in_use?

        connection.close
        assert_not_predicate connection, :in_use?

        assert_predicate pool.connection, :in_use?
      end

      def test_released_connection_moves_between_threads
        thread_conn = nil

        Thread.new {
          pool.with_connection do |conn|
            thread_conn = conn
          end
        }.join

        assert thread_conn

        Thread.new {
          pool.with_connection do |conn|
            assert_equal thread_conn, conn
          end
        }.join
      end

      def test_with_connection
        assert_equal 0, active_connections(pool).size

        main_thread = pool.connection
        assert_equal 1, active_connections(pool).size

        Thread.new {
          pool.with_connection do |conn|
            assert conn
            assert_equal 2, active_connections(pool).size
          end
          assert_equal 1, active_connections(pool).size
        }.join

        main_thread.close
        assert_equal 0, active_connections(pool).size
      end

      def test_active_connection_in_use
        assert_not_predicate pool, :active_connection?
        main_thread = pool.connection

        assert_predicate pool, :active_connection?

        main_thread.close

        assert_not_predicate pool, :active_connection?
      end

      def test_full_pool_exception
        @pool.checkout_timeout = 0.001 # no need to delay test suite by waiting the whole full default timeout
        @pool.size.times { assert @pool.checkout }

        assert_raises(ConnectionTimeoutError) do
          @pool.checkout
        end
      end

      def test_full_pool_blocks
        cs = @pool.size.times.map { @pool.checkout }
        t = Thread.new { @pool.checkout }

        # make sure our thread is in the timeout section
        Thread.pass until @pool.num_waiting_in_queue == 1

        connection = cs.first
        connection.close
        assert_equal connection, t.join.value
      end

      def test_full_pool_blocking_shares_load_interlock
        @pool.instance_variable_set(:@size, 1)

        load_interlock_latch = Concurrent::CountDownLatch.new
        connection_latch = Concurrent::CountDownLatch.new

        able_to_get_connection = false
        able_to_load = false

        thread_with_load_interlock = Thread.new do
          ActiveSupport::Dependencies.interlock.running do
            load_interlock_latch.count_down
            connection_latch.wait

            @pool.with_connection do
              able_to_get_connection = true
            end
          end
        end

        thread_with_last_connection = Thread.new do
          @pool.with_connection do
            connection_latch.count_down
            load_interlock_latch.wait

            ActiveSupport::Dependencies.interlock.loading do
              able_to_load = true
            end
          end
        end

        thread_with_load_interlock.join
        thread_with_last_connection.join

        assert able_to_get_connection
        assert able_to_load
      end

      def test_removing_releases_latch
        cs = @pool.size.times.map { @pool.checkout }
        t = Thread.new { @pool.checkout }

        # make sure our thread is in the timeout section
        Thread.pass until @pool.num_waiting_in_queue == 1

        connection = cs.first
        @pool.remove connection
        assert_respond_to t.join.value, :execute
        connection.close
      end

      def test_reap_and_active
        @pool.checkout
        @pool.checkout
        @pool.checkout

        connections = @pool.connections.dup

        @pool.reap

        assert_equal connections.length, @pool.connections.length
      end

      def test_reap_inactive
        ready = Concurrent::CountDownLatch.new
        @pool.checkout
        child = Thread.new do
          @pool.checkout
          @pool.checkout
          ready.count_down
          Thread.stop
        end
        ready.wait

        assert_equal 3, active_connections(@pool).size

        child.terminate
        child.join
        @pool.reap

        assert_equal 1, active_connections(@pool).size
      ensure
        @pool.connections.each { |conn| conn.close if conn.in_use? }
      end

      def test_idle_timeout_configuration
        @pool.disconnect!
        spec = ActiveRecord::Base.connection_pool.spec
        spec.config.merge!(idle_timeout: "0.02")
        @pool = ConnectionPool.new(spec)
        idle_conn = @pool.checkout
        @pool.checkin(idle_conn)

        idle_conn.instance_variable_set(
          :@idle_since,
          Concurrent.monotonic_time - 0.01
        )

        @pool.flush
        assert_equal 1, @pool.connections.length

        idle_conn.instance_variable_set(
          :@idle_since,
          Concurrent.monotonic_time - 0.02
        )

        @pool.flush
        assert_equal 0, @pool.connections.length
      end

      def test_disable_flush
        @pool.disconnect!
        spec = ActiveRecord::Base.connection_pool.spec
        spec.config.merge!(idle_timeout: -5)
        @pool = ConnectionPool.new(spec)
        idle_conn = @pool.checkout
        @pool.checkin(idle_conn)

        idle_conn.instance_variable_set(
          :@idle_since,
          Concurrent.monotonic_time - 1
        )

        @pool.flush
        assert_equal 1, @pool.connections.length
      end

      def test_flush
        idle_conn = @pool.checkout
        recent_conn = @pool.checkout
        active_conn = @pool.checkout

        @pool.checkin idle_conn
        @pool.checkin recent_conn

        assert_equal 3, @pool.connections.length

        idle_conn.instance_variable_set(
          :@idle_since,
          Concurrent.monotonic_time - 1000
        )

        @pool.flush(30)

        assert_equal 2, @pool.connections.length

        assert_equal [recent_conn, active_conn].sort_by(&:__id__), @pool.connections.sort_by(&:__id__)
      ensure
        @pool.checkin active_conn
      end

      def test_flush_bang
        idle_conn = @pool.checkout
        recent_conn = @pool.checkout
        active_conn = @pool.checkout
        _dead_conn = Thread.new { @pool.checkout }.join

        @pool.checkin idle_conn
        @pool.checkin recent_conn

        assert_equal 4, @pool.connections.length

        def idle_conn.seconds_idle
          1000
        end

        @pool.flush!

        assert_equal 1, @pool.connections.length

        assert_equal [active_conn].sort_by(&:__id__), @pool.connections.sort_by(&:__id__)
      ensure
        @pool.checkin active_conn
      end

      def test_remove_connection
        conn = @pool.checkout
        assert_predicate conn, :in_use?

        length = @pool.connections.length
        @pool.remove conn
        assert_predicate conn, :in_use?
        assert_equal(length - 1, @pool.connections.length)
      ensure
        conn.close
      end

      def test_remove_connection_for_thread
        conn = @pool.connection
        @pool.remove conn
        assert_not_equal(conn, @pool.connection)
      ensure
        conn.close if conn
      end

      def test_active_connection?
        assert_not_predicate @pool, :active_connection?
        assert @pool.connection
        assert_predicate @pool, :active_connection?
        @pool.release_connection
        assert_not_predicate @pool, :active_connection?
      end

      def test_checkout_behaviour
        pool = ConnectionPool.new ActiveRecord::Base.connection_pool.spec
        main_connection = pool.connection
        assert_not_nil main_connection
        threads = []
        4.times do |i|
          threads << Thread.new(i) do
            thread_connection = pool.connection
            assert_not_nil thread_connection
            thread_connection.close
          end
        end

        threads.each(&:join)

        Thread.new do
          assert pool.connection
          pool.connection.close
        end.join
      end

      def test_checkout_order_is_lifo
        conn1 = @pool.checkout
        conn2 = @pool.checkout
        @pool.checkin conn1
        @pool.checkin conn2
        assert_equal [conn2, conn1], 2.times.map { @pool.checkout }
      end

      # The connection pool is "fair" if threads waiting for
      # connections receive them in the order in which they began
      # waiting.  This ensures that we don't timeout one HTTP request
      # even while well under capacity in a multi-threaded environment
      # such as a Java servlet container.
      #
      # We don't need strict fairness: if two connections become
      # available at the same time, it's fine if two threads that were
      # waiting acquire the connections out of order.
      #
      # Thus this test prepares waiting threads and then trickles in
      # available connections slowly, ensuring the wakeup order is
      # correct in this case.
      def test_checkout_fairness
        @pool.instance_variable_set(:@size, 10)
        expected = (1..@pool.size).to_a.freeze
        # check out all connections so our threads start out waiting
        conns = expected.map { @pool.checkout }
        mutex = Mutex.new
        order = []
        errors = []

        threads = expected.map do |i|
          t = Thread.new {
            begin
              @pool.checkout # never checked back in
              mutex.synchronize { order << i }
            rescue => e
              mutex.synchronize { errors << e }
            end
          }
          Thread.pass until @pool.num_waiting_in_queue == i
          t
        end

        # this should wake up the waiting threads one by one in order
        conns.each { |conn| @pool.checkin(conn); sleep 0.1 }

        threads.each(&:join)

        raise errors.first if errors.any?

        assert_equal(expected, order)
      end

      # As mentioned in #test_checkout_fairness, we don't care about
      # strict fairness.  This test creates two groups of threads:
      # group1 whose members all start waiting before any thread in
      # group2.  Enough connections are checked in to wakeup all
      # group1 threads, and the fact that only group1 and no group2
      # threads acquired a connection is enforced.
      def test_checkout_fairness_by_group
        @pool.instance_variable_set(:@size, 10)
        # take all the connections
        conns = (1..10).map { @pool.checkout }
        mutex = Mutex.new
        successes = []    # threads that successfully got a connection
        errors = []

        make_thread = proc do |i|
          t = Thread.new {
            begin
              @pool.checkout # never checked back in
              mutex.synchronize { successes << i }
            rescue => e
              mutex.synchronize { errors << e }
            end
          }
          Thread.pass until @pool.num_waiting_in_queue == i
          t
        end

        # all group1 threads start waiting before any in group2
        group1 = (1..5).map(&make_thread)
        group2 = (6..10).map(&make_thread)

        # checkin n connections back to the pool
        checkin = proc do |n|
          n.times do
            c = conns.pop
            @pool.checkin(c)
          end
        end

        checkin.call(group1.size)         # should wake up all group1

        loop do
          sleep 0.1
          break if mutex.synchronize { (successes.size + errors.size) == group1.size }
        end

        winners = mutex.synchronize { successes.dup }
        checkin.call(group2.size)         # should wake up everyone remaining

        group1.each(&:join)
        group2.each(&:join)

        assert_equal((1..group1.size).to_a, winners.sort)

        if errors.any?
          raise errors.first
        end
      end

      def test_automatic_reconnect_restores_after_disconnect
        pool = ConnectionPool.new ActiveRecord::Base.connection_pool.spec
        assert pool.automatic_reconnect
        assert pool.connection

        pool.disconnect!
        assert pool.connection
      end

      def test_automatic_reconnect_can_be_disabled
        pool = ConnectionPool.new ActiveRecord::Base.connection_pool.spec
        pool.disconnect!
        pool.automatic_reconnect = false

        assert_raises(ConnectionNotEstablished) do
          pool.connection
        end

        assert_raises(ConnectionNotEstablished) do
          pool.with_connection
        end
      end

      def test_pool_sets_connection_visitor
        assert @pool.connection.visitor.is_a?(Arel::Visitors::ToSql)
      end

      # make sure exceptions are thrown when establish_connection
      # is called with an anonymous class
      def test_anonymous_class_exception
        anonymous = Class.new(ActiveRecord::Base)

        assert_raises(RuntimeError) do
          anonymous.establish_connection
        end
      end

      class ConnectionTestModel < ActiveRecord::Base
      end

      def test_connection_notification_is_called
        payloads = []
        subscription = ActiveSupport::Notifications.subscribe("!connection.active_record") do |name, started, finished, unique_id, payload|
          payloads << payload
        end
        ConnectionTestModel.establish_connection :arunit

        assert_equal [:config, :connection_id, :spec_name], payloads[0].keys.sort
        assert_equal "ActiveRecord::ConnectionAdapters::ConnectionPoolTest::ConnectionTestModel", payloads[0][:spec_name]
      ensure
        ActiveSupport::Notifications.unsubscribe(subscription) if subscription
      end

      def test_pool_sets_connection_schema_cache
        connection = pool.checkout
        schema_cache = SchemaCache.new connection
        schema_cache.add(:posts)
        pool.schema_cache = schema_cache

        pool.with_connection do |conn|
          assert_not_same pool.schema_cache, conn.schema_cache
          assert_equal pool.schema_cache.size, conn.schema_cache.size
          assert_same pool.schema_cache.columns(:posts), conn.schema_cache.columns(:posts)
        end

        pool.checkin connection
      end

      def test_concurrent_connection_establishment
        assert_operator @pool.connections.size, :<=, 1

        all_threads_in_new_connection = Concurrent::CountDownLatch.new(@pool.size - @pool.connections.size)
        all_go                        = Concurrent::CountDownLatch.new

        @pool.singleton_class.class_eval do
          define_method(:new_connection) do
            all_threads_in_new_connection.count_down
            all_go.wait
            super()
          end
        end

        connecting_threads = []
        @pool.size.times do
          connecting_threads << Thread.new { @pool.checkout }
        end

        begin
          Timeout.timeout(5) do
            # the kernel of the whole test is here, everything else is just scaffolding,
            # this latch will not be released unless conn. pool allows for concurrent
            # connection creation
            all_threads_in_new_connection.wait
          end
        rescue Timeout::Error
          flunk "pool unable to establish connections concurrently or implementation has " \
                "changed, this test then needs to patch a different :new_connection method"
        ensure
          # clean up the threads
          all_go.count_down
          connecting_threads.map(&:join)
        end
      end

      def test_non_bang_disconnect_and_clear_reloadable_connections_throw_exception_if_threads_dont_return_their_conns
        Thread.report_on_exception, original_report_on_exception = false, Thread.report_on_exception
        @pool.checkout_timeout = 0.001 # no need to delay test suite by waiting the whole full default timeout
        [:disconnect, :clear_reloadable_connections].each do |group_action_method|
          @pool.with_connection do |connection|
            assert_raises(ExclusiveConnectionTimeoutError) do
              Thread.new { @pool.send(group_action_method) }.join
            end
          end
        end
      ensure
        Thread.report_on_exception = original_report_on_exception
      end

      def test_disconnect_and_clear_reloadable_connections_attempt_to_wait_for_threads_to_return_their_conns
        [:disconnect, :disconnect!, :clear_reloadable_connections, :clear_reloadable_connections!].each do |group_action_method|
          begin
            thread = timed_join_result = nil
            @pool.with_connection do |connection|
              thread = Thread.new { @pool.send(group_action_method) }

              # give the other `thread` some time to get stuck in `group_action_method`
              timed_join_result = thread.join(0.3)
              # thread.join # => `nil` means the other thread hasn't finished running and is still waiting for us to
              # release our connection
              assert_nil timed_join_result

              # assert that since this is within default timeout our connection hasn't been forcefully taken away from us
              assert_predicate @pool, :active_connection?
            end
          ensure
            thread.join if thread && !timed_join_result # clean up the other thread
          end
        end
      end

      def test_bang_versions_of_disconnect_and_clear_reloadable_connections_if_unable_to_acquire_all_connections_proceed_anyway
        @pool.checkout_timeout = 0.001 # no need to delay test suite by waiting the whole full default timeout
        [:disconnect!, :clear_reloadable_connections!].each do |group_action_method|
          @pool.with_connection do |connection|
            Thread.new { @pool.send(group_action_method) }.join
            # assert connection has been forcefully taken away from us
            assert_not_predicate @pool, :active_connection?

            # make a new connection for with_connection to clean up
            @pool.connection
          end
        end
      end

      def test_disconnect_and_clear_reloadable_connections_are_able_to_preempt_other_waiting_threads
        with_single_connection_pool do |pool|
          [:disconnect, :disconnect!, :clear_reloadable_connections, :clear_reloadable_connections!].each do |group_action_method|
            conn               = pool.connection # drain the only available connection
            second_thread_done = Concurrent::Event.new

            begin
              # create a first_thread and let it get into the FIFO queue first
              first_thread = Thread.new do
                pool.with_connection { second_thread_done.wait }
              end

              # wait for first_thread to get in queue
              Thread.pass until pool.num_waiting_in_queue == 1

              # create a different, later thread, that will attempt to do a "group action",
              # but because of the group action semantics it should be able to preempt the
              # first_thread when a connection is made available
              second_thread = Thread.new do
                pool.send(group_action_method)
                second_thread_done.set
              end

              # wait for second_thread to get in queue
              Thread.pass until pool.num_waiting_in_queue == 2

              # return the only available connection
              pool.checkin(conn)

              # if the second_thread is not able to preempt the first_thread,
              # they will temporarily (until either of them timeouts with ConnectionTimeoutError)
              # deadlock and a join(2) timeout will be reached
              assert second_thread.join(2), "#{group_action_method} is not able to preempt other waiting threads"

            ensure
              # post test clean up
              failed = !second_thread_done.set?

              if failed
                second_thread_done.set

                first_thread.join(2)
                second_thread.join(2)
              end

              first_thread.join(10) || raise("first_thread got stuck")
              second_thread.join(10) || raise("second_thread got stuck")
            end
          end
        end
      end

      def test_clear_reloadable_connections_creates_new_connections_for_waiting_threads_if_necessary
        with_single_connection_pool do |pool|
          conn = pool.connection # drain the only available connection
          def conn.requires_reloading? # make sure it gets removed from the pool by clear_reloadable_connections
            true
          end

          stuck_thread = Thread.new do
            pool.with_connection { }
          end

          # wait for stuck_thread to get in queue
          Thread.pass until pool.num_waiting_in_queue == 1

          pool.clear_reloadable_connections

          unless stuck_thread.join(2)
            flunk "clear_reloadable_connections must not let other connection waiting threads get stuck in queue"
          end

          assert_equal 0, pool.num_waiting_in_queue
        end
      end

      def test_connection_pool_stat
        with_single_connection_pool do |pool|
          pool.with_connection do |connection|
            stats = pool.stat
            assert_equal({ size: 1, connections: 1, busy: 1, dead: 0, idle: 0, waiting: 0, checkout_timeout: 5 }, stats)
          end

          stats = pool.stat
          assert_equal({ size: 1, connections: 1, busy: 0, dead: 0, idle: 1, waiting: 0, checkout_timeout: 5 }, stats)

          Thread.new do
            pool.checkout
            Thread.current.kill
          end.join

          stats = pool.stat
          assert_equal({ size: 1, connections: 1, busy: 0, dead: 1, idle: 0, waiting: 0, checkout_timeout: 5 }, stats)
        end
      end

      private
        def with_single_connection_pool
          one_conn_spec = ActiveRecord::Base.connection_pool.spec.dup
          one_conn_spec.config[:pool] = 1 # this is safe to do, because .dupped ConnectionSpecification also auto-dups its config
          yield(pool = ConnectionPool.new(one_conn_spec))
        ensure
          pool.disconnect! if pool
        end
    end
  end
end