aboutsummaryrefslogtreecommitdiffstats
path: root/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb
diff options
context:
space:
mode:
authorPatrick Mahoney <pat@polycrystal.org>2012-05-20 21:46:08 -0500
committerPatrick Mahoney <pat@polycrystal.org>2012-05-20 21:48:04 -0500
commitd06674d5dfcecb6355890020ee2245b6e87b07e4 (patch)
tree4fecc2e1773aff8e92987a7a3ab4709601494e85 /activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb
parent7c95be54b4c3f8ad2273eea39afa233f8f8b31c1 (diff)
downloadrails-d06674d5dfcecb6355890020ee2245b6e87b07e4.tar.gz
rails-d06674d5dfcecb6355890020ee2245b6e87b07e4.tar.bz2
rails-d06674d5dfcecb6355890020ee2245b6e87b07e4.zip
Make connection pool fair with respect to waiting threads.
Diffstat (limited to 'activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb')
-rw-r--r--activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb120
1 files changed, 86 insertions, 34 deletions
diff --git a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb
index c6699737b4..450ef69744 100644
--- a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb
+++ b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb
@@ -2,7 +2,6 @@ require 'thread'
require 'monitor'
require 'set'
require 'active_support/core_ext/module/deprecation'
-require 'timeout'
module ActiveRecord
# Raised when a connection could not be obtained within the connection
@@ -92,21 +91,6 @@ module ActiveRecord
attr_accessor :automatic_reconnect, :timeout
attr_reader :spec, :connections, :size, :reaper
- class Latch # :nodoc:
- def initialize
- @mutex = Mutex.new
- @cond = ConditionVariable.new
- end
-
- def release
- @mutex.synchronize { @cond.broadcast }
- end
-
- def await
- @mutex.synchronize { @cond.wait @mutex }
- end
- end
-
# Creates a new ConnectionPool object. +spec+ is a ConnectionSpecification
# object which describes database connection information (e.g. adapter,
# host name, username, password, etc), as well as the maximum size for
@@ -128,9 +112,25 @@ module ActiveRecord
# default max pool size to 5
@size = (spec.config[:pool] && spec.config[:pool].to_i) || 5
- @latch = Latch.new
@connections = []
@automatic_reconnect = true
+
+ # connections available to be checked out
+ @available = []
+
+ # number of threads waiting to check out a connection
+ @num_waiting = 0
+
+ # signal threads waiting
+ @cond = new_cond
+ end
+
+ # Hack for tests to be able to add connections. Do not call outside of tests
+ def insert_connection_for_test!(c)
+ synchronize do
+ @connections << c
+ @available << c
+ end
end
# Retrieve the connection associated with the current thread, or call
@@ -188,6 +188,7 @@ module ActiveRecord
conn.disconnect!
end
@connections = []
+ @available = []
end
end
@@ -202,6 +203,9 @@ module ActiveRecord
@connections.delete_if do |conn|
conn.requires_reloading?
end
+ @available.delete_if do |conn|
+ conn.requires_reloading?
+ end
end
end
@@ -225,23 +229,19 @@ module ActiveRecord
# Raises:
# - PoolFullError: no connection can be obtained from the pool.
def checkout
- loop do
- # Checkout an available connection
- synchronize do
- # Try to find a connection that hasn't been leased, and lease it
- conn = connections.find { |c| c.lease }
-
- # If all connections were leased, and we have room to expand,
- # create a new connection and lease it.
- if !conn && connections.size < size
- conn = checkout_new_connection
- conn.lease
- end
+ synchronize do
+ conn = nil
+
+ if @num_waiting == 0
+ conn = acquire_connection
+ end
- return checkout_and_verify(conn) if conn
+ unless conn
+ conn = wait_until(@timeout) { acquire_connection }
end
- Timeout.timeout(@timeout, PoolFullError) { @latch.await }
+ conn.lease
+ checkout_and_verify(conn)
end
end
@@ -257,8 +257,10 @@ module ActiveRecord
end
release conn
+
+ @available.unshift conn
+ @cond.signal
end
- @latch.release
end
# Remove a connection from the connection pool. The connection will
@@ -266,12 +268,14 @@ module ActiveRecord
def remove(conn)
synchronize do
@connections.delete conn
+ @available.delete conn
# FIXME: we might want to store the key on the connection so that removing
# from the reserved hash will be a little easier.
release conn
+
+ @cond.signal # can make a new connection now
end
- @latch.release
end
# Removes dead connections from the pool. A dead connection can occur
@@ -283,12 +287,60 @@ module ActiveRecord
connections.dup.each do |conn|
remove conn if conn.in_use? && stale > conn.last_use && !conn.active?
end
+ @cond.broadcast # may violate fairness
end
- @latch.release
end
private
+ # Take an available connection or, if possible, create a new
+ # one, or nil.
+ #
+ # Monitor must be held while calling this method.
+ #
+ # Returns: a newly acquired connection.
+ def acquire_connection
+ if @available.any?
+ @available.pop
+ elsif connections.size < size
+ checkout_new_connection
+ end
+ end
+
+ # Wait on +@cond+ until the block returns non-nil. Note that
+ # unlike MonitorMixin::ConditionVariable#wait_until, this method
+ # does not test the block before the first wait period.
+ #
+ # Monitor must be held when calling this method.
+ #
+ # +timeout+: Integer timeout in seconds
+ #
+ # Returns: the result of the block
+ #
+ # Raises:
+ # - PoolFullError: timeout elapsed before +&block+ returned a connection
+ def wait_until(timeout, &block)
+ @num_waiting += 1
+ begin
+ t0 = Time.now
+ loop do
+ elapsed = Time.now - t0
+ if elapsed >= timeout
+ msg = 'could not obtain a database connection within %0.3f seconds (waited %0.3f seconds)' %
+ [timeout, elapsed]
+ raise PoolFullError, msg
+ end
+
+ @cond.wait(timeout - elapsed)
+
+ conn = yield
+ return conn if conn
+ end
+ ensure
+ @num_waiting -= 1
+ end
+ end
+
def release(conn)
thread_id = if @reserved_connections[current_connection_id] == conn
current_connection_id