aboutsummaryrefslogblamecommitdiffstats
path: root/activerecord/test/cases/connection_pool_test.rb
blob: c905772193bf9d14d7f1043826caa9aac8bfab16 (plain) (tree)
1
2
3
4
5
6
7
8
9
                      
                                          



                                                     

                       
               

             

                                                                          









                                                                                          

         
                 
                         

         



                                            









                                      

















                                                        




























                                                         
                                  
                                           
                                                
                        


           




                                                        
                                                         










                                                        
                                                         



                                                
                        

         
                              


                      




                                           



                                                                 
                                                     
                      


                             
                       
                     
           
                   

                                                      
 

                       

                  
                                                      
            
                                       

         











                                                          




                                                
                          

         







                                        





                                                                         
                                     

                                        
                            

             
 
                            
 
                     
                                

                               
         
 
                                                            
                                                                 




                                                                      
                                                                      
















                                                                    
                                                    




                                               
                                                           





























                                                                      
                                                    




                                                  
                                                           


































                                                                                      


















                                                                         



                                                                    
 
                                                                 
                                         


                                                       

                                     


                                                     














                                                                                          

                                                  


                                                                                                                  






























                                                                                             































































































































                                                                                                                                  


       
require "cases/helper"
require 'active_support/concurrency/latch'

module ActiveRecord
  module ConnectionAdapters
    class ConnectionPoolTest < ActiveRecord::TestCase
      attr_reader :pool

      def setup
        super

        # Keep a duplicate pool so we do not bother others
        @pool = ConnectionPool.new ActiveRecord::Base.connection_pool.spec

        if in_memory_db?
          # Separate connections to an in-memory database create an entirely new database,
          # with an empty schema etc, so we just stub out this schema on the fly.
          @pool.with_connection do |connection|
            connection.create_table :posts do |t|
              t.integer :cololumn
            end
          end
        end
      end

      teardown do
        @pool.disconnect!
      end

      def active_connections(pool)
        pool.connections.find_all(&:in_use?)
      end

      def test_checkout_after_close
        connection = pool.connection
        assert connection.in_use?

        connection.close
        assert !connection.in_use?

        assert pool.connection.in_use?
      end

      def test_released_connection_moves_between_threads
        thread_conn = nil

        Thread.new {
          pool.with_connection do |conn|
            thread_conn = conn
          end
        }.join

        assert thread_conn

        Thread.new {
          pool.with_connection do |conn|
            assert_equal thread_conn, conn
          end
        }.join
      end

      def test_with_connection
        assert_equal 0, active_connections(pool).size

        main_thread = pool.connection
        assert_equal 1, active_connections(pool).size

        Thread.new {
          pool.with_connection do |conn|
            assert conn
            assert_equal 2, active_connections(pool).size
          end
          assert_equal 1, active_connections(pool).size
        }.join

        main_thread.close
        assert_equal 0, active_connections(pool).size
      end

      def test_active_connection_in_use
        assert !pool.active_connection?
        main_thread = pool.connection

        assert pool.active_connection?

        main_thread.close

        assert !pool.active_connection?
      end

      def test_full_pool_exception
        @pool.size.times { @pool.checkout }
        assert_raises(ConnectionTimeoutError) do
          @pool.checkout
        end
      end

      def test_full_pool_blocks
        cs = @pool.size.times.map { @pool.checkout }
        t = Thread.new { @pool.checkout }

        # make sure our thread is in the timeout section
        Thread.pass until @pool.num_waiting_in_queue == 1

        connection = cs.first
        connection.close
        assert_equal connection, t.join.value
      end

      def test_removing_releases_latch
        cs = @pool.size.times.map { @pool.checkout }
        t = Thread.new { @pool.checkout }

        # make sure our thread is in the timeout section
        Thread.pass until @pool.num_waiting_in_queue == 1

        connection = cs.first
        @pool.remove connection
        assert_respond_to t.join.value, :execute
        connection.close
      end

      def test_reap_and_active
        @pool.checkout
        @pool.checkout
        @pool.checkout

        connections = @pool.connections.dup

        @pool.reap

        assert_equal connections.length, @pool.connections.length
      end

      def test_reap_inactive
        ready = ActiveSupport::Concurrency::Latch.new
        @pool.checkout
        child = Thread.new do
          @pool.checkout
          @pool.checkout
          ready.release
          Thread.stop
        end
        ready.await

        assert_equal 3, active_connections(@pool).size

        child.terminate
        child.join
        @pool.reap

        assert_equal 1, active_connections(@pool).size
      ensure
        @pool.connections.each(&:close)
      end

      def test_remove_connection
        conn = @pool.checkout
        assert conn.in_use?

        length = @pool.connections.length
        @pool.remove conn
        assert conn.in_use?
        assert_equal(length - 1, @pool.connections.length)
      ensure
        conn.close
      end

      def test_remove_connection_for_thread
        conn = @pool.connection
        @pool.remove conn
        assert_not_equal(conn, @pool.connection)
      ensure
        conn.close if conn
      end

      def test_active_connection?
        assert !@pool.active_connection?
        assert @pool.connection
        assert @pool.active_connection?
        @pool.release_connection
        assert !@pool.active_connection?
      end

      def test_checkout_behaviour
        pool = ConnectionPool.new ActiveRecord::Base.connection_pool.spec
        connection = pool.connection
        assert_not_nil connection
        threads = []
        4.times do |i|
          threads << Thread.new(i) do
            connection = pool.connection
            assert_not_nil connection
            connection.close
          end
        end

        threads.each(&:join)

        Thread.new do
          assert pool.connection
          pool.connection.close
        end.join
      end

      # The connection pool is "fair" if threads waiting for
      # connections receive them in the order in which they began
      # waiting.  This ensures that we don't timeout one HTTP request
      # even while well under capacity in a multi-threaded environment
      # such as a Java servlet container.
      #
      # We don't need strict fairness: if two connections become
      # available at the same time, it's fine if two threads that were
      # waiting acquire the connections out of order.
      #
      # Thus this test prepares waiting threads and then trickles in
      # available connections slowly, ensuring the wakeup order is
      # correct in this case.
      def test_checkout_fairness
        @pool.instance_variable_set(:@size, 10)
        expected = (1..@pool.size).to_a.freeze
        # check out all connections so our threads start out waiting
        conns = expected.map { @pool.checkout }
        mutex = Mutex.new
        order = []
        errors = []

        threads = expected.map do |i|
          t = Thread.new {
            begin
              @pool.checkout # never checked back in
              mutex.synchronize { order << i }
            rescue => e
              mutex.synchronize { errors << e }
            end
          }
          Thread.pass until @pool.num_waiting_in_queue == i
          t
        end

        # this should wake up the waiting threads one by one in order
        conns.each { |conn| @pool.checkin(conn); sleep 0.1 }

        threads.each(&:join)

        raise errors.first if errors.any?

        assert_equal(expected, order)
      end

      # As mentioned in #test_checkout_fairness, we don't care about
      # strict fairness.  This test creates two groups of threads:
      # group1 whose members all start waiting before any thread in
      # group2.  Enough connections are checked in to wakeup all
      # group1 threads, and the fact that only group1 and no group2
      # threads acquired a connection is enforced.
      def test_checkout_fairness_by_group
        @pool.instance_variable_set(:@size, 10)
        # take all the connections
        conns = (1..10).map { @pool.checkout }
        mutex = Mutex.new
        successes = []    # threads that successfully got a connection
        errors = []

        make_thread = proc do |i|
          t = Thread.new {
            begin
              @pool.checkout # never checked back in
              mutex.synchronize { successes << i }
            rescue => e
              mutex.synchronize { errors << e }
            end
          }
          Thread.pass until @pool.num_waiting_in_queue == i
          t
        end

        # all group1 threads start waiting before any in group2
        group1 = (1..5).map(&make_thread)
        group2 = (6..10).map(&make_thread)

        # checkin n connections back to the pool
        checkin = proc do |n|
          n.times do
            c = conns.pop
            @pool.checkin(c)
          end
        end

        checkin.call(group1.size)         # should wake up all group1

        loop do
          sleep 0.1
          break if mutex.synchronize { (successes.size + errors.size) == group1.size }
        end

        winners = mutex.synchronize { successes.dup }
        checkin.call(group2.size)         # should wake up everyone remaining

        group1.each(&:join)
        group2.each(&:join)

        assert_equal((1..group1.size).to_a, winners.sort)

        if errors.any?
          raise errors.first
        end
      end

      def test_automatic_reconnect=
        pool = ConnectionPool.new ActiveRecord::Base.connection_pool.spec
        assert pool.automatic_reconnect
        assert pool.connection

        pool.disconnect!
        assert pool.connection

        pool.disconnect!
        pool.automatic_reconnect = false

        assert_raises(ConnectionNotEstablished) do
          pool.connection
        end

        assert_raises(ConnectionNotEstablished) do
          pool.with_connection
        end
      end

      def test_pool_sets_connection_visitor
        assert @pool.connection.visitor.is_a?(Arel::Visitors::ToSql)
      end

      # make sure exceptions are thrown when establish_connection
      # is called with an anonymous class
      def test_anonymous_class_exception
        anonymous = Class.new(ActiveRecord::Base)
        handler = ActiveRecord::Base.connection_handler

        assert_raises(RuntimeError) {
          handler.establish_connection anonymous, nil
        }
      end

      def test_pool_sets_connection_schema_cache
        connection = pool.checkout
        schema_cache = SchemaCache.new connection
        schema_cache.add(:posts)
        pool.schema_cache = schema_cache

        pool.with_connection do |conn|
          assert_not_same pool.schema_cache, conn.schema_cache
          assert_equal pool.schema_cache.size, conn.schema_cache.size
          assert_same pool.schema_cache.columns(:posts), conn.schema_cache.columns(:posts)
        end

        pool.checkin connection
      end

      def test_concurrent_connection_establishment
        assert_operator @pool.connections.size, :<=, 1

        all_threads_in_new_connection = ActiveSupport::Concurrency::Latch.new(@pool.size - @pool.connections.size)
        all_go                        = ActiveSupport::Concurrency::Latch.new

        @pool.singleton_class.class_eval do
          define_method(:new_connection) do
            all_threads_in_new_connection.release
            all_go.await
            super()
          end
        end

        connecting_threads = []
        @pool.size.times do
          connecting_threads << Thread.new { @pool.checkout }
        end

        begin
          Timeout.timeout(5) do
            # the kernel of the whole test is here, everything else is just scaffolding,
            # this latch will not be released unless conn. pool allows for concurrent
            # connection creation
            all_threads_in_new_connection.await
          end
        rescue Timeout::Error
          flunk 'pool unable to establish connections concurrently or implementation has ' <<
                'changed, this test then needs to patch a different :new_connection method'
        ensure
          # clean up the threads
          all_go.release
          connecting_threads.map(&:join)
        end
      end

      def test_non_bang_disconnect_and_clear_reloadable_connections_throw_exception_if_threads_dont_return_their_conns
        @pool.checkout_timeout = 0.001 # no need to delay test suite by waiting the whole full default timeout
        [:disconnect, :clear_reloadable_connections].each do |group_action_method|
          @pool.with_connection do |connection|
            assert_raises(ExclusiveConnectionTimeoutError) do
              Thread.new { @pool.send(group_action_method) }.join
            end
          end
        end
      end

      def test_disconnect_and_clear_reloadable_connections_attempt_to_wait_for_threads_to_return_their_conns
        [:disconnect, :disconnect!, :clear_reloadable_connections, :clear_reloadable_connections!].each do |group_action_method|
          begin
            thread = timed_join_result = nil
            @pool.with_connection do |connection|
              thread = Thread.new { @pool.send(group_action_method) }

              # give the other `thread` some time to get stuck in `group_action_method`
              timed_join_result = thread.join(0.3)
              # thread.join # => `nil` means the other thread hasn't finished running and is still waiting for us to
              # release our connection
              assert_nil timed_join_result

              # assert that since this is within default timeout our connection hasn't been forcefully taken away from us
              assert @pool.active_connection?
            end
          ensure
            thread.join if thread && !timed_join_result # clean up the other thread
          end
        end
      end

      def test_bang_versions_of_disconnect_and_clear_reloadable_connections_if_unable_to_aquire_all_connections_proceed_anyway
        @pool.checkout_timeout = 0.001 # no need to delay test suite by waiting the whole full default timeout
        [:disconnect!, :clear_reloadable_connections!].each do |group_action_method|
          @pool.with_connection do |connection|
            Thread.new { @pool.send(group_action_method) }.join
            # assert connection has been forcefully taken away from us
            assert_not @pool.active_connection?
          end
        end
      end

      def test_disconnect_and_clear_reloadable_connections_are_able_to_preempt_other_waiting_threads
        with_single_connection_pool do |pool|
          [:disconnect, :disconnect!, :clear_reloadable_connections, :clear_reloadable_connections!].each do |group_action_method|
            conn               = pool.connection # drain the only available connection
            second_thread_done = ActiveSupport::Concurrency::Latch.new

            # create a first_thread and let it get into the FIFO queue first
            first_thread = Thread.new do
              pool.with_connection { second_thread_done.await }
            end

            # wait for first_thread to get in queue
            Thread.pass until pool.num_waiting_in_queue == 1

            # create a different, later thread, that will attempt to do a "group action",
            # but because of the group action semantics it should be able to preempt the
            # first_thread when a connection is made available
            second_thread = Thread.new do
              pool.send(group_action_method)
              second_thread_done.release
            end

            # wait for second_thread to get in queue
            Thread.pass until pool.num_waiting_in_queue == 2

            # return the only available connection
            pool.checkin(conn)

            # if the second_thread is not able to preempt the first_thread,
            # they will temporarily (until either of them timeouts with ConnectionTimeoutError)
            # deadlock and a join(2) timeout will be reached
            failed = true unless second_thread.join(2)

            #--- post test clean up start
            second_thread_done.release if failed

            # after `pool.disconnect()` the first thread will be left stuck in queue, no need to wait for
            # it to timeout with ConnectionTimeoutError
            if (group_action_method == :disconnect || group_action_method == :disconnect!) && pool.num_waiting_in_queue > 0
              pool.with_connection {} # create a new connection in case there are threads still stuck in a queue
            end

            first_thread.join
            second_thread.join
            #--- post test clean up end

            flunk "#{group_action_method} is not able to preempt other waiting threads" if failed
          end
        end
      end

      def test_clear_reloadable_connections_creates_new_connections_for_waiting_threads_if_necessary
        with_single_connection_pool do |pool|
          conn = pool.connection # drain the only available connection
          def conn.requires_reloading? # make sure it gets removed from the pool by clear_reloadable_connections
            true
          end

          stuck_thread = Thread.new do
            pool.with_connection {}
          end

          # wait for stuck_thread to get in queue
          Thread.pass until pool.num_waiting_in_queue == 1

          pool.clear_reloadable_connections

          unless stuck_thread.join(2)
            flunk 'clear_reloadable_connections must not let other connection waiting threads get stuck in queue'
          end

          assert_equal 0, pool.num_waiting_in_queue
        end
      end

      private
      def with_single_connection_pool
        one_conn_spec = ActiveRecord::Base.connection_pool.spec.dup
        one_conn_spec.config[:pool] = 1 # this is safe to do, because .dupped ConnectionSpecification also auto-dups its config
        yield(pool = ConnectionPool.new(one_conn_spec))
      ensure
        pool.disconnect! if pool
      end
    end
  end
end