aboutsummaryrefslogtreecommitdiffstats
path: root/activerecord
diff options
context:
space:
mode:
authorthedarkone <thedarkone2@gmail.com>2015-05-14 02:29:59 +0200
committerthedarkone <thedarkone2@gmail.com>2015-05-14 02:29:59 +0200
commite92f5a99d6a6306d96b4c7dd8f96c2822f2f2c7b (patch)
tree57d868e732b8aebfd7b5688bacb09d51dd43b1c4 /activerecord
parenta3923e667ca2c78734665389be964aa8492cfe1c (diff)
downloadrails-e92f5a99d6a6306d96b4c7dd8f96c2822f2f2c7b.tar.gz
rails-e92f5a99d6a6306d96b4c7dd8f96c2822f2f2c7b.tar.bz2
rails-e92f5a99d6a6306d96b4c7dd8f96c2822f2f2c7b.zip
AR::ConPool - establish connections outside of critical section.
Diffstat (limited to 'activerecord')
-rw-r--r--activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb128
-rw-r--r--activerecord/test/cases/connection_adapters/adapter_leasing_test.rb2
-rw-r--r--activerecord/test/cases/connection_pool_test.rb34
3 files changed, 136 insertions, 28 deletions
diff --git a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb
index aeaaa48945..1d550c0b39 100644
--- a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb
+++ b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb
@@ -129,17 +129,15 @@ module ActiveRecord
# - ConnectionTimeoutError if +timeout+ is given and no element
# becomes available within +timeout+ seconds,
def poll(timeout = nil)
- synchronize do
- if timeout
- no_wait_poll || wait_poll(timeout)
- else
- no_wait_poll
- end
- end
+ synchronize { internal_poll(timeout) }
end
private
+ def internal_poll(timeout)
+ no_wait_poll || (timeout && wait_poll(timeout))
+ end
+
def synchronize(&block)
@lock.synchronize(&block)
end
@@ -193,6 +191,20 @@ module ActiveRecord
end
end
+ # Connections must be leased while holding the main pool mutex. This is
+ # an internal subclass that also +.leases+ returned connections while
+ # still in queue's critical section (queue synchronizes with the same
+ # +@lock+ as the main pool) so that a returned connection is already
+ # leased and there is no need to re-enter synchronized block.
+ class ConnectionLeasingQueue < Queue # :nodoc:
+ private
+ def internal_poll(timeout)
+ conn = super
+ conn.lease if conn
+ conn
+ end
+ end
+
# Every +frequency+ seconds, the reaper will call +reap+ on +pool+.
# A reaper instantiated with a nil frequency will never reap the
# connection pool.
@@ -247,7 +259,12 @@ module ActiveRecord
@connections = []
@automatic_reconnect = true
- @available = Queue.new self
+ # Connection pool allows for concurrent (outside the main `synchronize` section)
+ # establishment of new connections. This variable tracks the number of threads
+ # currently in the process of independently establishing connections to the DB.
+ @now_connecting = 0
+
+ @available = ConnectionLeasingQueue.new self
end
# Retrieve the connection associated with the current thread, or call
@@ -340,8 +357,7 @@ module ActiveRecord
# Raises:
# - ConnectionTimeoutError: no connection can be obtained from the pool.
def checkout
- conn = synchronize { acquire_connection.tap(&:lease) }
- checkout_and_verify(conn)
+ checkout_and_verify(acquire_connection)
end
# Check-in a database connection back into the pool, indicating that you
@@ -366,6 +382,8 @@ module ActiveRecord
# Remove a connection from the connection pool. The connection will
# remain open and active but will no longer be managed by this pool.
def remove(conn)
+ needs_new_connection = false
+
synchronize do
@connections.delete conn
@available.delete conn
@@ -381,7 +399,17 @@ module ActiveRecord
# that are "stuck" there are helpless, they have no way of creating
# new connections and are completely reliant on us feeding available
# connections into the Queue.
- @available.add checkout_new_connection if @available.any_waiting?
+ needs_new_connection = @available.any_waiting?
+ end
+
+ # This is intentionally done outside of the synchronized section as we
+ # would like not to hold the main mutex while checking out new connections,
+ # thus there is some chance that needs_new_connection information is now
+ # stale, we can live with that (try_to_checkout_new_connection will make
+ # sure not to exceed the pool's @size limit).
+ if needs_new_connection && new_conn = try_to_checkout_new_connection
+ # make the new_conn available to the starving threads stuck @available Queue
+ checkin new_conn
end
end
@@ -409,14 +437,19 @@ module ActiveRecord
private
def synchronized_connection_retrieval
- conn = synchronize do
- # re-checking under lock for correct DCL semantics
- @reserved_connections[current_connection_id] ||= acquire_connection.tap(&:lease)
- end
- begin
- checkout_and_verify(conn)
- rescue Exception # clean up if something goes wrong in post_checkout
- synchronize { @reserved_connections.delete_pair(current_connection_id, conn) }
+ conn = checkout
+ previous_value = nil
+ synchronize do # re-checking under lock for correct DCL semantics
+ # Cache#put_if_absent returns either `nil` (if insertion was successful, ie there was
+ # no previous current_connection_id mapping) or an existing value (if insertion
+ # failed because there already was a current_connection_id mapping)
+ previous_value = @reserved_connections.put_if_absent(current_connection_id, conn)
+ end
+ if previous_value # if we were too late and insertion failed
+ checkin(conn)
+ previous_value
+ else
+ conn
end
end
@@ -427,11 +460,19 @@ module ActiveRecord
#
# Raises:
# - ConnectionTimeoutError if a connection could not be acquired
+ #
+ #--
+ # Implementation detail: the connection returned by +acquire_connection+
+ # will already be "+connection.lease+ -ed" to the current thread.
def acquire_connection
- if conn = @available.poll
+ # NOTE: we rely on `@available.poll` and `try_to_checkout_new_connection` to
+ # `conn.lease` the returned connection (and to do this in a `synchronized`
+ # section), this is not the cleanest implementation, as ideally we would
+ # `synchronize { conn.lease }` in this method, but by leaving it to `@available.poll`
+ # and `try_to_checkout_new_connection` we can piggyback on `synchronize` sections
+ # of the said methods and avoid an additional `synchronize` overhead.
+ if conn = @available.poll || try_to_checkout_new_connection
conn
- elsif @connections.size < @size
- checkout_new_connection
else
reap
@available.poll(@checkout_timeout)
@@ -456,13 +497,46 @@ module ActiveRecord
Base.connection_id ||= Thread.current.object_id
end
+ # If the pool is not at a +@size+ limit, establish new connection. Connecting
+ # to the DB is done outside main synchronized section.
+ #--
+ # Implementation constraint: a newly established connection returned by this
+ # method must be in the +.leased+ state.
+ def try_to_checkout_new_connection
+ # first in synchronized section check if establishing new conns is allowed
+ # and increment @now_connecting, to prevent overstepping this pool's @size
+ # constraint
+ do_checkout = synchronize do
+ if (@connections.size + @now_connecting) < @size
+ @now_connecting += 1
+ end
+ end
+ if do_checkout
+ begin
+ # if successfully incremented @now_connecting establish new connection
+ # outside of synchronized section
+ conn = checkout_new_connection
+ ensure
+ synchronize do
+ if conn
+ adopt_connection(conn)
+ # returned conn needs to be already leased
+ conn.lease
+ end
+ @now_connecting -= 1
+ end
+ end
+ end
+ end
+
+ def adopt_connection(conn)
+ conn.pool = self
+ @connections << conn
+ end
+
def checkout_new_connection
raise ConnectionNotEstablished unless @automatic_reconnect
-
- c = new_connection
- c.pool = self
- @connections << c
- c
+ new_connection
end
def checkout_and_verify(c)
diff --git a/activerecord/test/cases/connection_adapters/adapter_leasing_test.rb b/activerecord/test/cases/connection_adapters/adapter_leasing_test.rb
index fd5f183ab0..580568c8ac 100644
--- a/activerecord/test/cases/connection_adapters/adapter_leasing_test.rb
+++ b/activerecord/test/cases/connection_adapters/adapter_leasing_test.rb
@@ -6,7 +6,7 @@ module ActiveRecord
class Pool < ConnectionPool
def insert_connection_for_test!(c)
synchronize do
- @connections << c
+ adopt_connection(c)
@available.add c
end
end
diff --git a/activerecord/test/cases/connection_pool_test.rb b/activerecord/test/cases/connection_pool_test.rb
index f5928814a3..3e563cd7cf 100644
--- a/activerecord/test/cases/connection_pool_test.rb
+++ b/activerecord/test/cases/connection_pool_test.rb
@@ -356,6 +356,40 @@ module ActiveRecord
pool.checkin connection
end
+
+ def test_concurrent_connection_establishment
+ all_threads_in_new_connection = ActiveSupport::Concurrency::Latch.new(@pool.size)
+ all_go = ActiveSupport::Concurrency::Latch.new
+
+ @pool.singleton_class.class_eval do
+ define_method(:new_connection) do
+ all_threads_in_new_connection.release
+ all_go.await
+ super()
+ end
+ end
+
+ connecting_threads = []
+ @pool.size.times do
+ connecting_threads << Thread.new { @pool.checkout }
+ end
+
+ begin
+ Timeout.timeout(5) do
+ # the kernel of the whole test is here, everything else is just scaffolding,
+ # this latch will not be released unless conn. pool allows for concurrent
+ # connection creation
+ all_threads_in_new_connection.await
+ end
+ rescue Timeout::Error
+ flunk 'pool unable to establish connections concurrently or implementation has ' <<
+ 'changed, this test then needs to patch a different :new_connection method'
+ ensure
+ # clean up the threads
+ all_go.release
+ connecting_threads.map(&:join)
+ end
+ end
end
end
end