path: root/activerecord
diff options
authorRafael Mendonça França <rafaelmfranca@gmail.com>2012-09-17 15:07:28 -0700
committerRafael Mendonça França <rafaelmfranca@gmail.com>2012-09-17 15:07:28 -0700
commit24a77743b69b2a3452848f7d78afd2692b209a03 (patch)
treee991c2a700c0693268e7712c29ad81417731c4ba /activerecord
parentdd76b3bbc8c4b161e6d4f77012c90960f68d1ac3 (diff)
parent0693e079708a52b777f2b7872b8e3d467b880a0d (diff)
Merge pull request #7675 from jrochkind/fair_conn_pool_backport
backport fair connection pool 02b2335563 to 3-2-stable
Diffstat (limited to 'activerecord')
4 files changed, 300 insertions, 50 deletions
diff --git a/activerecord/CHANGELOG.md b/activerecord/CHANGELOG.md
index b3c1b034d7..5133438e32 100644
--- a/activerecord/CHANGELOG.md
+++ b/activerecord/CHANGELOG.md
@@ -1,5 +1,10 @@
## Rails 3.2.9 (unreleased)
+* Make ActiveRecord::ConnectionPool 'fair', first thread waiting is
+ first thread given newly available connection. Backport of #6492 02b2335563
+ *jrochkind*
* Fix creation of through association models when using `collection=[]`
on a `has_many :through` association from an unsaved model.
Fix #7661.
diff --git a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb
index d4649102df..235efc12f6 100644
--- a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb
+++ b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb
@@ -57,10 +57,141 @@ module ActiveRecord
# * +wait_timeout+: number of seconds to block and wait for a connection
# before giving up and raising a timeout error (default 5 seconds).
class ConnectionPool
+ # Threadsafe, fair, FIFO queue. Meant to be used by ConnectionPool
+ # with which it shares a Monitor. But could be a generic Queue.
+ #
+ # The Queue in stdlib's 'thread' could replace this class except
+ # stdlib's doesn't support waiting with a timeout.
+ class Queue
+ def initialize(lock = Monitor.new)
+ @lock = lock
+ @cond = @lock.new_cond
+ @num_waiting = 0
+ @queue = []
+ end
+ # Test if any threads are currently waiting on the queue.
+ def any_waiting?
+ synchronize do
+ @num_waiting > 0
+ end
+ end
+ # Return the number of threads currently waiting on this
+ # queue.
+ def num_waiting
+ synchronize do
+ @num_waiting
+ end
+ end
+ # Add +element+ to the queue. Never blocks.
+ def add(element)
+ synchronize do
+ @queue.push element
+ @cond.signal
+ end
+ end
+ # If +element+ is in the queue, remove and return it, or nil.
+ def delete(element)
+ synchronize do
+ @queue.delete(element)
+ end
+ end
+ # Remove all elements from the queue.
+ def clear
+ synchronize do
+ @queue.clear
+ end
+ end
+ # Remove the head of the queue.
+ #
+ # If +timeout+ is not given, remove and return the head the
+ # queue if the number of available elements is strictly
+ # greater than the number of threads currently waiting (that
+ # is, don't jump ahead in line). Otherwise, return nil.
+ #
+ # If +timeout+ is given, block if it there is no element
+ # available, waiting up to +timeout+ seconds for an element to
+ # become available.
+ #
+ # Raises:
+ # - ConnectionTimeoutError if +timeout+ is given and no element
+ # becomes available after +timeout+ seconds,
+ def poll(timeout = nil)
+ synchronize do
+ if timeout
+ no_wait_poll || wait_poll(timeout)
+ else
+ no_wait_poll
+ end
+ end
+ end
+ private
+ def synchronize(&block)
+ @lock.synchronize(&block)
+ end
+ # Test if the queue currently contains any elements.
+ def any?
+ !@queue.empty?
+ end
+ # A thread can remove an element from the queue without
+ # waiting if an only if the number of currently available
+ # connections is strictly greater than the number of waiting
+ # threads.
+ def can_remove_no_wait?
+ @queue.size > @num_waiting
+ end
+ # Removes and returns the head of the queue if possible, or nil.
+ def remove
+ @queue.shift
+ end
+ # Remove and return the head the queue if the number of
+ # available elements is strictly greater than the number of
+ # threads currently waiting. Otherwise, return nil.
+ def no_wait_poll
+ remove if can_remove_no_wait?
+ end
+ # Waits on the queue up to +timeout+ seconds, then removes and
+ # returns the head of the queue.
+ def wait_poll(timeout)
+ @num_waiting += 1
+ t0 = Time.now
+ elapsed = 0
+ loop do
+ @cond.wait(timeout - elapsed)
+ return remove if any?
+ elapsed = Time.now - t0
+ if elapsed >= timeout
+ msg = 'could not obtain a database connection within %0.3f seconds (waited %0.3f seconds)' %
+ [timeout, elapsed]
+ raise ConnectionTimeoutError, msg
+ end
+ end
+ ensure
+ @num_waiting -= 1
+ end
+ end
include MonitorMixin
attr_accessor :automatic_reconnect
- attr_reader :spec, :connections
+ attr_reader :spec, :connections, :size
# Creates a new ConnectionPool object. +spec+ is a ConnectionSpecification
# object which describes database connection information (e.g. adapter,
@@ -76,7 +207,6 @@ module ActiveRecord
# The cache of reserved connections mapped to threads
@reserved_connections = {}
- @queue = new_cond
@timeout = spec.config[:wait_timeout] || 5
# default max pool size to 5
@@ -84,6 +214,16 @@ module ActiveRecord
@connections = []
@automatic_reconnect = true
+ @available = Queue.new self
+ end
+ # Hack for tests to be able to add connections. Do not call outside of tests
+ def insert_connection_for_test!(c) #:nodoc:
+ synchronize do
+ @connections << c
+ @available.add c
+ end
# Retrieve the connection associated with the current thread, or call
@@ -139,6 +279,7 @@ module ActiveRecord
@connections = []
+ @available.clear
@@ -150,9 +291,15 @@ module ActiveRecord
checkin conn
conn.disconnect! if conn.requires_reloading?
@connections.delete_if do |conn|
+ @available.clear
+ @connections.each do |conn|
+ @available.add conn
+ end
@@ -216,58 +363,22 @@ connection. For example: ActiveRecord::Base.connection.close
# Check-out a database connection from the pool, indicating that you want
# to use it. You should call #checkin when you no longer need this.
- # This is done by either returning an existing connection, or by creating
- # a new connection. If the maximum number of connections for this pool has
- # already been reached, but the pool is empty (i.e. they're all being used),
- # then this method will wait until a thread has checked in a connection.
- # The wait time is bounded however: if no connection can be checked out
- # within the timeout specified for this pool, then a ConnectionTimeoutError
- # exception will be raised.
+ # This is done by either returning and leasing existing connection, or by
+ # creating a new connection and leasing it.
+ #
+ # If all connections are leased and the pool is at capacity (meaning the
+ # number of currently leased connections is greater than or equal to the
+ # size limit set), an ActiveRecord::PoolFullError exception will be raised.
# Returns: an AbstractAdapter object.
# Raises:
- # - ConnectionTimeoutError: no connection can be obtained from the pool
- # within the timeout period.
+ # - PoolFullError: no connection can be obtained from the pool.
def checkout
synchronize do
- waited_time = 0
- loop do
- conn = @connections.find { |c| c.lease }
- unless conn
- if @connections.size < @size
- conn = checkout_new_connection
- conn.lease
- end
- end
- if conn
- checkout_and_verify conn
- return conn
- end
- if waited_time >= @timeout
- raise ConnectionTimeoutError, "could not obtain a database connection#{" within #{@timeout} seconds" if @timeout} (waited #{waited_time} seconds). The max pool size is currently #{@size}; consider increasing it."
- end
- # Sometimes our wait can end because a connection is available,
- # but another thread can snatch it up first. If timeout hasn't
- # passed but no connection is avail, looks like that happened --
- # loop and wait again, for the time remaining on our timeout.
- before_wait = Time.now
- @queue.wait( [@timeout - waited_time, 0].max )
- waited_time += (Time.now - before_wait)
- # Will go away in Rails 4, when we don't clean up
- # after leaked connections automatically anymore. Right now, clean
- # up after we've returned from a 'wait' if it looks like it's
- # needed, then loop and try again.
- if(active_connections.size >= @connections.size)
- clear_stale_cached_connections!
- end
- end
+ conn = acquire_connection
+ conn.lease
+ checkout_and_verify(conn)
@@ -280,10 +391,40 @@ connection. For example: ActiveRecord::Base.connection.close
synchronize do
conn.run_callbacks :checkin do
- @queue.signal
release conn
+ @available.add conn
+ end
+ end
+ # Acquire a connection by one of 1) immediately removing one
+ # from the queue of available connections, 2) creating a new
+ # connection if the pool is not at capacity, 3) waiting on the
+ # queue for a connection to become available (first calling
+ # clear_stale_cached_connections! to clean up leaked connections,
+ # this cleanup will prob be going away in Rails4).
+ #
+ # Raises:
+ # - ConnectionTimeoutError if a connection could not be acquired
+ def acquire_connection
+ if conn = @available.poll
+ conn
+ elsif @connections.size < @size
+ checkout_new_connection
+ else
+ # this conditional clear_stale will go away in Rails 4, when we don't
+ # clean up after leaked connections automatically anymore. Right now,
+ # clean up after we've returned from a 'wait' if it looks like it's
+ # needed before trying to wait for a connection.
+ synchronize do
+ if(active_connections.size >= @connections.size)
+ clear_stale_cached_connections!
+ end
+ end
+ @available.poll(@timeout)
diff --git a/activerecord/test/cases/connection_adapters/abstract_adapter_test.rb b/activerecord/test/cases/connection_adapters/abstract_adapter_test.rb
index 7af9079b48..3dcde699fc 100644
--- a/activerecord/test/cases/connection_adapters/abstract_adapter_test.rb
+++ b/activerecord/test/cases/connection_adapters/abstract_adapter_test.rb
@@ -36,7 +36,7 @@ module ActiveRecord
def test_close
pool = ConnectionPool.new(Base::ConnectionSpecification.new({}, nil))
- pool.connections << adapter
+ pool.insert_connection_for_test! adapter
adapter.pool = pool
# Make sure the pool marks the connection in use
diff --git a/activerecord/test/cases/connection_pool_test.rb b/activerecord/test/cases/connection_pool_test.rb
index 5cecfa90e7..2ada2c1fe1 100644
--- a/activerecord/test/cases/connection_pool_test.rb
+++ b/activerecord/test/cases/connection_pool_test.rb
@@ -128,6 +128,110 @@ module ActiveRecord
+ # The connection pool is "fair" if threads waiting for
+ # connections receive them the order in which they began
+ # waiting. This ensures that we don't timeout one HTTP request
+ # even while well under capacity in a multi-threaded environment
+ # such as a Java servlet container.
+ #
+ # We don't need strict fairness: if two connections become
+ # available at the same time, it's fine of two threads that were
+ # waiting acquire the connections out of order.
+ #
+ # Thus this test prepares waiting threads and then trickles in
+ # available connections slowly, ensuring the wakeup order is
+ # correct in this case.
+ def test_checkout_fairness
+ @pool.instance_variable_set(:@size, 10)
+ expected = (1..@pool.size).to_a.freeze
+ # check out all connections so our threads start out waiting
+ conns = expected.map { @pool.checkout }
+ mutex = Mutex.new
+ order = []
+ errors = []
+ threads = expected.map do |i|
+ t = Thread.new {
+ begin
+ @pool.checkout # connection return value never checked back in
+ mutex.synchronize { order << i }
+ rescue => e
+ mutex.synchronize { errors << e }
+ end
+ }
+ Thread.pass until t.status == "sleep"
+ t
+ end
+ # this should wake up the waiting threads one by one in order
+ conns.each { |conn| @pool.checkin(conn); sleep 0.1 }
+ threads.each(&:join)
+ raise errors.first if errors.any?
+ assert_equal(expected, order)
+ end
+ # As mentioned in #test_checkout_fairness, we don't care about
+ # strict fairness. This test creates two groups of threads:
+ # group1 whose members all start waiting before any thread in
+ # group2. Enough connections are checked in to wakeup all
+ # group1 threads, and the fact that only group1 and no group2
+ # threads acquired a connection is enforced.
+ def test_checkout_fairness_by_group
+ @pool.instance_variable_set(:@size, 10)
+ # take all the connections
+ conns = (1..10).map { @pool.checkout }
+ mutex = Mutex.new
+ successes = [] # threads that successfully got a connection
+ errors = []
+ make_thread = proc do |i|
+ t = Thread.new {
+ begin
+ @pool.checkout # connection return value never checked back in
+ mutex.synchronize { successes << i }
+ rescue => e
+ mutex.synchronize { errors << e }
+ end
+ }
+ Thread.pass until t.status == "sleep"
+ t
+ end
+ # all group1 threads start waiting before any in group2
+ group1 = (1..5).map(&make_thread)
+ group2 = (6..10).map(&make_thread)
+ # checkin n connections back to the pool
+ checkin = proc do |n|
+ n.times do
+ c = conns.pop
+ @pool.checkin(c)
+ end
+ end
+ checkin.call(group1.size) # should wake up all group1
+ loop do
+ sleep 0.1
+ break if mutex.synchronize { (successes.size + errors.size) == group1.size }
+ end
+ winners = mutex.synchronize { successes.dup }
+ checkin.call(group2.size) # should wake up everyone remaining
+ group1.each(&:join)
+ group2.each(&:join)
+ assert_equal((1..group1.size).to_a, winners.sort)
+ if errors.any?
+ raise errors.first
+ end
+ end
def test_automatic_reconnect=
pool = ConnectionPool.new ActiveRecord::Base.connection_pool.spec
assert pool.automatic_reconnect