From 0693e079708a52b777f2b7872b8e3d467b880a0d Mon Sep 17 00:00:00 2001
From: Jonathan Rochkind <jonathan@dnil.net>
Date: Mon, 17 Sep 2012 17:06:14 -0400
Subject: backport fair connection pool 02b2335563 to 3-2-stable

---
 activerecord/CHANGELOG.md                          |   5 +
 .../abstract/connection_pool.rb                    | 239 ++++++++++++++++-----
 .../connection_adapters/abstract_adapter_test.rb   |   2 +-
 activerecord/test/cases/connection_pool_test.rb    | 104 +++++++++
 4 files changed, 300 insertions(+), 50 deletions(-)

diff --git a/activerecord/CHANGELOG.md b/activerecord/CHANGELOG.md
index b3c1b034d7..5133438e32 100644
--- a/activerecord/CHANGELOG.md
+++ b/activerecord/CHANGELOG.md
@@ -1,5 +1,10 @@
 ## Rails 3.2.9 (unreleased)
 
+*   Make ActiveRecord::ConnectionPool 'fair', first thread waiting is
+    first thread given newly available connection. Backport of #6492 02b2335563
+
+    *jrochkind*
+
 *   Fix creation of through association models when using `collection=[]`
     on a `has_many :through` association from an unsaved model.
     Fix #7661.
diff --git a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb
index d4649102df..235efc12f6 100644
--- a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb
+++ b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb
@@ -57,10 +57,141 @@ module ActiveRecord
     # * +wait_timeout+: number of seconds to block and wait for a connection
     #   before giving up and raising a timeout error (default 5 seconds).
     class ConnectionPool
+
+      # Threadsafe, fair, FIFO queue.  Meant to be used by ConnectionPool
+      # with which it shares a Monitor.  But could be a generic Queue.
+      #
+      # The Queue in stdlib's 'thread' could replace this class except
+      # stdlib's doesn't support waiting with a timeout.
+      class Queue
+        def initialize(lock = Monitor.new)
+          @lock = lock
+          @cond = @lock.new_cond
+          @num_waiting = 0
+          @queue = []
+        end
+
+        # Test if any threads are currently waiting on the queue.
+        def any_waiting?
+          synchronize do
+            @num_waiting > 0
+          end
+        end
+
+        # Return the number of threads currently waiting on this
+        # queue.
+        def num_waiting
+          synchronize do
+            @num_waiting
+          end
+        end
+
+        # Add +element+ to the queue.  Never blocks.
+        def add(element)
+          synchronize do
+            @queue.push element
+            @cond.signal
+          end
+        end
+
+        # If +element+ is in the queue, remove and return it, or nil.
+        def delete(element)
+          synchronize do
+            @queue.delete(element)
+          end
+        end
+
+        # Remove all elements from the queue.
+        def clear
+          synchronize do
+            @queue.clear
+          end
+        end
+
+        # Remove the head of the queue.
+        #
+        # If +timeout+ is not given, remove and return the head the
+        # queue if the number of available elements is strictly
+        # greater than the number of threads currently waiting (that
+        # is, don't jump ahead in line).  Otherwise, return nil.
+        #
+        # If +timeout+ is given, block if it there is no element
+        # available, waiting up to +timeout+ seconds for an element to
+        # become available.
+        #
+        # Raises:
+        # - ConnectionTimeoutError if +timeout+ is given and no element
+        # becomes available after +timeout+ seconds,
+        def poll(timeout = nil)
+          synchronize do
+            if timeout
+              no_wait_poll || wait_poll(timeout)
+            else
+              no_wait_poll
+            end
+          end
+        end
+
+        private
+
+        def synchronize(&block)
+          @lock.synchronize(&block)
+        end
+
+        # Test if the queue currently contains any elements.
+        def any?
+          !@queue.empty?
+        end
+
+        # A thread can remove an element from the queue without
+        # waiting if an only if the number of currently available
+        # connections is strictly greater than the number of waiting
+        # threads.
+        def can_remove_no_wait?
+          @queue.size > @num_waiting
+        end
+
+        # Removes and returns the head of the queue if possible, or nil.
+        def remove
+          @queue.shift
+        end
+
+        # Remove and return the head the queue if the number of
+        # available elements is strictly greater than the number of
+        # threads currently waiting.  Otherwise, return nil.
+        def no_wait_poll
+          remove if can_remove_no_wait?
+        end
+
+        # Waits on the queue up to +timeout+ seconds, then removes and
+        # returns the head of the queue.
+        def wait_poll(timeout)
+          @num_waiting += 1
+
+          t0 = Time.now
+          elapsed = 0
+          loop do
+            @cond.wait(timeout - elapsed)
+
+            return remove if any?
+
+            elapsed = Time.now - t0
+
+            if elapsed >= timeout
+              msg = 'could not obtain a database connection within %0.3f seconds (waited %0.3f seconds)' %
+                [timeout, elapsed]
+              raise ConnectionTimeoutError, msg
+            end
+          end
+        ensure
+          @num_waiting -= 1
+        end
+      end
+
       include MonitorMixin
 
       attr_accessor :automatic_reconnect
-      attr_reader :spec, :connections
+      attr_reader :spec, :connections, :size
 
       # Creates a new ConnectionPool object. +spec+ is a ConnectionSpecification
       # object which describes database connection information (e.g. adapter,
@@ -76,7 +207,6 @@ module ActiveRecord
         # The cache of reserved connections mapped to threads
         @reserved_connections = {}
 
-        @queue = new_cond
         @timeout = spec.config[:wait_timeout] || 5
 
         # default max pool size to 5
@@ -84,6 +214,16 @@ module ActiveRecord
 
         @connections         = []
         @automatic_reconnect = true
+
+        @available = Queue.new self
+      end
+
+      # Hack for tests to be able to add connections.  Do not call outside of tests
+      def insert_connection_for_test!(c) #:nodoc:
+        synchronize do
+          @connections << c
+          @available.add c
+        end
       end
 
       # Retrieve the connection associated with the current thread, or call
@@ -139,6 +279,7 @@ module ActiveRecord
             conn.disconnect!
           end
           @connections = []
+          @available.clear
         end
       end
 
@@ -150,9 +291,15 @@ module ActiveRecord
             checkin conn
             conn.disconnect! if conn.requires_reloading?
           end
+
           @connections.delete_if do |conn|
             conn.requires_reloading?
           end
+          @available.clear
+          @connections.each do |conn|
+            @available.add conn
+          end
+          
         end
       end
 
@@ -216,58 +363,22 @@ connection.  For example: ActiveRecord::Base.connection.close
       # Check-out a database connection from the pool, indicating that you want
       # to use it. You should call #checkin when you no longer need this.
       #
-      # This is done by either returning an existing connection, or by creating
-      # a new connection. If the maximum number of connections for this pool has
-      # already been reached, but the pool is empty (i.e. they're all being used),
-      # then this method will wait until a thread has checked in a connection.
-      # The wait time is bounded however: if no connection can be checked out
-      # within the timeout specified for this pool, then a ConnectionTimeoutError
-      # exception will be raised.
+      # This is done by either returning and leasing existing connection, or by
+      # creating a new connection and leasing it.
+      #
+      # If all connections are leased and the pool is at capacity (meaning the
+      # number of currently leased connections is greater than or equal to the
+      # size limit set), an ActiveRecord::PoolFullError exception will be raised.
       #
       # Returns: an AbstractAdapter object.
       #
       # Raises:
-      # - ConnectionTimeoutError: no connection can be obtained from the pool
-      #   within the timeout period.
+      # - PoolFullError: no connection can be obtained from the pool.
       def checkout
         synchronize do
-          waited_time = 0
-
-          loop do
-            conn = @connections.find { |c| c.lease }
-
-            unless conn
-              if @connections.size < @size
-                conn = checkout_new_connection
-                conn.lease
-              end
-            end
-
-            if conn
-              checkout_and_verify conn
-              return conn
-            end
-
-            if waited_time >= @timeout
-              raise ConnectionTimeoutError, "could not obtain a database connection#{" within #{@timeout} seconds" if @timeout} (waited #{waited_time} seconds). The max pool size is currently #{@size}; consider increasing it."
-            end
-
-            # Sometimes our wait can end because a connection is available,
-            # but another thread can snatch it up first. If timeout hasn't
-            # passed but no connection is avail, looks like that happened --
-            # loop and wait again, for the time remaining on our timeout. 
-            before_wait = Time.now
-            @queue.wait( [@timeout - waited_time, 0].max )
-            waited_time += (Time.now - before_wait)
-
-            # Will go away in Rails 4, when we don't clean up
-            # after leaked connections automatically anymore. Right now, clean
-            # up after we've returned from a 'wait' if it looks like it's
-            # needed, then loop and try again. 
-            if(active_connections.size >= @connections.size)
-              clear_stale_cached_connections!
-            end
-          end
+          conn = acquire_connection
+          conn.lease
+          checkout_and_verify(conn)
         end
       end
 
@@ -280,10 +391,40 @@ connection.  For example: ActiveRecord::Base.connection.close
         synchronize do
           conn.run_callbacks :checkin do
             conn.expire
-            @queue.signal
           end
 
           release conn
+
+          @available.add conn
+        end
+      end
+
+      # Acquire a connection by one of 1) immediately removing one
+      # from the queue of available connections, 2) creating a new
+      # connection if the pool is not at capacity, 3) waiting on the
+      # queue for a connection to become available (first calling
+      # clear_stale_cached_connections! to clean up leaked connections,
+      # this cleanup will prob be going away in Rails4).
+      #
+      # Raises:
+      # - ConnectionTimeoutError if a connection could not be acquired
+      def acquire_connection
+        if conn = @available.poll
+          conn
+        elsif @connections.size < @size
+          checkout_new_connection
+        else
+          # this conditional clear_stale will go away in Rails 4, when we don't
+          # clean up after leaked connections automatically anymore. Right now,
+          # clean up after we've returned from a 'wait' if it looks like it's
+          # needed before trying to wait for a connection.
+          synchronize do
+            if(active_connections.size >= @connections.size)
+              clear_stale_cached_connections!
+            end
+          end
+
+          @available.poll(@timeout)
         end
       end
 
diff --git a/activerecord/test/cases/connection_adapters/abstract_adapter_test.rb b/activerecord/test/cases/connection_adapters/abstract_adapter_test.rb
index 7af9079b48..3dcde699fc 100644
--- a/activerecord/test/cases/connection_adapters/abstract_adapter_test.rb
+++ b/activerecord/test/cases/connection_adapters/abstract_adapter_test.rb
@@ -36,7 +36,7 @@ module ActiveRecord
 
       def test_close
         pool = ConnectionPool.new(Base::ConnectionSpecification.new({}, nil))
-        pool.connections << adapter
+        pool.insert_connection_for_test! adapter
         adapter.pool = pool
 
         # Make sure the pool marks the connection in use
diff --git a/activerecord/test/cases/connection_pool_test.rb b/activerecord/test/cases/connection_pool_test.rb
index 5cecfa90e7..2ada2c1fe1 100644
--- a/activerecord/test/cases/connection_pool_test.rb
+++ b/activerecord/test/cases/connection_pool_test.rb
@@ -128,6 +128,110 @@ module ActiveRecord
 
       end
 
+      # The connection pool is "fair" if threads waiting for
+      # connections receive them the order in which they began
+      # waiting.  This ensures that we don't timeout one HTTP request
+      # even while well under capacity in a multi-threaded environment
+      # such as a Java servlet container.
+      #
+      # We don't need strict fairness: if two connections become
+      # available at the same time, it's fine of two threads that were
+      # waiting acquire the connections out of order.
+      #
+      # Thus this test prepares waiting threads and then trickles in
+      # available connections slowly, ensuring the wakeup order is
+      # correct in this case.
+      def test_checkout_fairness
+        @pool.instance_variable_set(:@size, 10)
+        expected = (1..@pool.size).to_a.freeze
+        # check out all connections so our threads start out waiting
+        conns = expected.map { @pool.checkout }
+        mutex = Mutex.new
+        order = []
+        errors = []
+
+        threads = expected.map do |i|
+          t = Thread.new {
+            begin              
+              @pool.checkout # connection return value never checked back in
+              mutex.synchronize { order << i }
+            rescue => e
+              mutex.synchronize { errors << e }
+            end
+          }
+          Thread.pass until t.status == "sleep"
+          t
+        end
+
+        # this should wake up the waiting threads one by one in order
+        conns.each { |conn| @pool.checkin(conn); sleep 0.1 }
+
+        threads.each(&:join)
+
+        raise errors.first if errors.any?
+
+        assert_equal(expected, order)
+      end
+
+      # As mentioned in #test_checkout_fairness, we don't care about
+      # strict fairness.  This test creates two groups of threads:
+      # group1 whose members all start waiting before any thread in
+      # group2.  Enough connections are checked in to wakeup all
+      # group1 threads, and the fact that only group1 and no group2
+      # threads acquired a connection is enforced.
+      def test_checkout_fairness_by_group
+        @pool.instance_variable_set(:@size, 10)
+        # take all the connections
+        conns = (1..10).map { @pool.checkout }
+        mutex = Mutex.new
+        successes = []    # threads that successfully got a connection
+        errors = []
+
+        make_thread = proc do |i|
+          t = Thread.new {
+            begin
+              @pool.checkout # connection return value never checked back in
+              mutex.synchronize { successes << i }
+            rescue => e
+              mutex.synchronize { errors << e }
+            end
+          }
+          Thread.pass until t.status == "sleep"
+          t
+        end
+
+        # all group1 threads start waiting before any in group2
+        group1 = (1..5).map(&make_thread)
+        group2 = (6..10).map(&make_thread)
+
+        # checkin n connections back to the pool
+        checkin = proc do |n|
+          n.times do
+            c = conns.pop
+            @pool.checkin(c)
+          end
+        end
+
+        checkin.call(group1.size)         # should wake up all group1
+
+        loop do
+          sleep 0.1
+          break if mutex.synchronize { (successes.size + errors.size) == group1.size }
+        end
+
+        winners = mutex.synchronize { successes.dup }
+        checkin.call(group2.size)         # should wake up everyone remaining
+
+        group1.each(&:join)
+        group2.each(&:join)
+
+        assert_equal((1..group1.size).to_a, winners.sort)
+
+        if errors.any?
+          raise errors.first
+        end
+      end
+
       def test_automatic_reconnect=
         pool = ConnectionPool.new ActiveRecord::Base.connection_pool.spec
         assert pool.automatic_reconnect
-- 
cgit v1.2.3