aboutsummaryrefslogtreecommitdiffstats
path: root/activerecord
diff options
context:
space:
mode:
authorMatthew Draper <matthew@trebex.net>2015-05-16 02:12:36 +0930
committerMatthew Draper <matthew@trebex.net>2015-05-16 02:12:36 +0930
commit97c482ff8eb652d4c18ab49fba034c506be902b8 (patch)
tree71a6a10049f4b228916120cea30ce6ddcf5eb95f /activerecord
parenta4dd94c61fd00d0f64189f6889a741b185edba8b (diff)
parent603fe20c0b8c05bc1cca8d01caadbd7060518141 (diff)
downloadrails-97c482ff8eb652d4c18ab49fba034c506be902b8.tar.gz
rails-97c482ff8eb652d4c18ab49fba034c506be902b8.tar.bz2
rails-97c482ff8eb652d4c18ab49fba034c506be902b8.zip
Merge pull request #14938 from thedarkone/pool-lock-fix
Reducing AR::ConPool's critical (synchronized) section
Diffstat (limited to 'activerecord')
-rw-r--r--activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb474
-rw-r--r--activerecord/lib/active_record/connection_adapters/abstract_adapter.rb17
-rw-r--r--activerecord/lib/active_record/connection_handling.rb2
-rw-r--r--activerecord/test/cases/connection_adapters/adapter_leasing_test.rb6
-rw-r--r--activerecord/test/cases/connection_pool_test.rb170
5 files changed, 577 insertions, 92 deletions
diff --git a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb
index 40c83b26af..5bfb4d779c 100644
--- a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb
+++ b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb
@@ -9,6 +9,12 @@ module ActiveRecord
class ConnectionTimeoutError < ConnectionNotEstablished
end
+ # Raised when a pool was unable to get ahold of all its connections
+ # to perform a "group" action such as +ConnectionPool#disconnect!+
+ # or +ConnectionPool#clear_reloadable_connections!+.
+ class ExclusiveConnectionTimeoutError < ConnectionTimeoutError
+ end
+
module ConnectionAdapters
# Connection pool base class for managing Active Record database
# connections.
@@ -62,6 +68,15 @@ module ActiveRecord
# connection at the end of a thread or a thread dies unexpectedly.
# Regardless of this setting, the Reaper will be invoked before every
# blocking wait. (Default nil, which means don't schedule the Reaper).
+ #
+ #--
+ # Synchronization policy:
+ # * all public methods can be called outside +synchronize+
+ # * access to these i-vars needs to be in +synchronize+:
+ # * @connections
+ # * @now_connecting
+ # * private methods that require being called in a +synchronize+ blocks
+ # are now explicitly documented
class ConnectionPool
# Threadsafe, fair, FIFO queue. Meant to be used by ConnectionPool
# with which it shares a Monitor. But could be a generic Queue.
@@ -128,17 +143,15 @@ module ActiveRecord
# - ConnectionTimeoutError if +timeout+ is given and no element
# becomes available within +timeout+ seconds,
def poll(timeout = nil)
- synchronize do
- if timeout
- no_wait_poll || wait_poll(timeout)
- else
- no_wait_poll
- end
- end
+ synchronize { internal_poll(timeout) }
end
private
+ def internal_poll(timeout)
+ no_wait_poll || (timeout && wait_poll(timeout))
+ end
+
def synchronize(&block)
@lock.synchronize(&block)
end
@@ -192,6 +205,80 @@ module ActiveRecord
end
end
+ # Adds the ability to turn a basic fair FIFO queue into one
+ # biased to some thread.
+ module BiasableQueue # :nodoc:
+ class BiasedConditionVariable # :nodoc:
+ # semantics of condition variables guarantee that +broadcast+, +broadcast_on_biased+,
+ # +signal+ and +wait+ methods are only called while holding a lock
+ def initialize(lock, other_cond, preferred_thread)
+ @real_cond = lock.new_cond
+ @other_cond = other_cond
+ @preferred_thread = preferred_thread
+ @num_waiting_on_real_cond = 0
+ end
+
+ def broadcast
+ broadcast_on_biased
+ @other_cond.broadcast
+ end
+
+ def broadcast_on_biased
+ @num_waiting_on_real_cond = 0
+ @real_cond.broadcast
+ end
+
+ def signal
+ if @num_waiting_on_real_cond > 0
+ @num_waiting_on_real_cond -= 1
+ @real_cond
+ else
+ @other_cond
+ end.signal
+ end
+
+ def wait(timeout)
+ if Thread.current == @preferred_thread
+ @num_waiting_on_real_cond += 1
+ @real_cond
+ else
+ @other_cond
+ end.wait(timeout)
+ end
+ end
+
+ def with_a_bias_for(thread)
+ previous_cond = nil
+ new_cond = nil
+ synchronize do
+ previous_cond = @cond
+ @cond = new_cond = BiasedConditionVariable.new(@lock, @cond, thread)
+ end
+ yield
+ ensure
+ synchronize do
+ @cond = previous_cond if previous_cond
+ new_cond.broadcast_on_biased if new_cond # wake up any remaining sleepers
+ end
+ end
+ end
+
+ # Connections must be leased while holding the main pool mutex. This is
+ # an internal subclass that also +.leases+ returned connections while
+ # still in queue's critical section (queue synchronizes with the same
+ # +@lock+ as the main pool) so that a returned connection is already
+ # leased and there is no need to re-enter synchronized block.
+ class ConnectionLeasingQueue < Queue # :nodoc:
+ include BiasableQueue
+
+ private
+ def internal_poll(timeout)
+ conn = super
+ conn.lease if conn
+ conn
+ end
+ end
+
# Every +frequency+ seconds, the reaper will call +reap+ on +pool+.
# A reaper instantiated with a nil frequency will never reap the
# connection pool.
@@ -240,56 +327,75 @@ module ActiveRecord
# default max pool size to 5
@size = (spec.config[:pool] && spec.config[:pool].to_i) || 5
- # The cache of reserved connections mapped to threads
- @reserved_connections = ThreadSafe::Cache.new(:initial_capacity => @size)
+ # The cache of threads mapped to reserved connections, the sole purpose
+ # of the cache is to speed-up +connection+ method, it is not the authoritative
+ # registry of which thread owns which connection, that is tracked by
+ # +connection.owner+ attr on each +connection+ instance.
+ # The invariant works like this: if there is mapping of +thread => conn+,
+ # then that +thread+ does indeed own that +conn+, however an absence of a such
+ # mapping does not mean that the +thread+ doesn't own the said connection, in
+ # that case +conn.owner+ attr should be consulted.
+ # Access and modification of +@thread_cached_conns+ does not require
+ # synchronization.
+ @thread_cached_conns = ThreadSafe::Cache.new(:initial_capacity => @size)
@connections = []
@automatic_reconnect = true
- @available = Queue.new self
+ # Connection pool allows for concurrent (outside the main `synchronize` section)
+ # establishment of new connections. This variable tracks the number of threads
+ # currently in the process of independently establishing connections to the DB.
+ @now_connecting = 0
+
+ # A boolean toggle that allows/disallows new connections.
+ @new_cons_enabled = true
+
+ @available = ConnectionLeasingQueue.new self
end
# Retrieve the connection associated with the current thread, or call
# #checkout to obtain one if necessary.
#
# #connection can be called any number of times; the connection is
- # held in a hash keyed by the thread id.
+ # held in a cache keyed by a thread.
def connection
- # this is correctly done double-checked locking
- # (ThreadSafe::Cache's lookups have volatile semantics)
- @reserved_connections[current_connection_id] || synchronize do
- @reserved_connections[current_connection_id] ||= checkout
- end
+ @thread_cached_conns[connection_cache_key(Thread.current)] ||= checkout
end
# Is there an open connection that is being used for the current thread?
+ #
+ # This method only works for connections that have been abtained through
+ # #connection or #with_connection methods, connections obtained through
+ # #checkout will not be detected by #active_connection?
def active_connection?
- synchronize do
- @reserved_connections.fetch(current_connection_id) {
- return false
- }.in_use?
- end
+ @thread_cached_conns[connection_cache_key(Thread.current)]
end
# Signal that the thread is finished with the current connection.
# #release_connection releases the connection-thread association
# and returns the connection to the pool.
- def release_connection(with_id = current_connection_id)
- synchronize do
- conn = @reserved_connections.delete(with_id)
- checkin conn if conn
+ #
+ # This method only works for connections that have been obtained through
+ # #connection or #with_connection methods, connections obtained through
+ # #checkout will not be automatically released.
+ def release_connection(owner_thread = Thread.current)
+ if conn = @thread_cached_conns.delete(connection_cache_key(owner_thread))
+ checkin conn
end
end
- # If a connection already exists yield it to the block. If no connection
+ # If a connection obtained through #connection or #with_connection methods
+ # already exists yield it to the block. If no such connection
# exists checkout a connection, yield it to the block, and checkin the
# connection when finished.
def with_connection
- connection_id = current_connection_id
- fresh_connection = true unless active_connection?
- yield connection
+ unless conn = @thread_cached_conns[connection_cache_key(Thread.current)]
+ conn = connection
+ fresh_connection = true
+ end
+ yield conn
ensure
- release_connection(connection_id) if fresh_connection
+ release_connection if fresh_connection
end
# Returns true if a connection has already been opened.
@@ -298,32 +404,81 @@ module ActiveRecord
end
# Disconnects all connections in the pool, and clears the pool.
- def disconnect!
- synchronize do
- @reserved_connections.clear
- @connections.each do |conn|
- checkin conn
- conn.disconnect!
+ #
+ # Raises:
+ # - +ExclusiveConnectionTimeoutError+ if unable to gain ownership of all
+ # connections in the pool within a timeout interval (default duration is
+ # +spec.config[:checkout_timeout] * 2+ seconds).
+ def disconnect(raise_on_acquisition_timeout = true)
+ with_exclusively_acquired_all_connections(raise_on_acquisition_timeout) do
+ synchronize do
+ @connections.each do |conn|
+ checkin conn
+ conn.disconnect!
+ end
+ @connections = []
+ @available.clear
end
- @connections = []
- @available.clear
end
end
- # Clears the cache which maps classes.
- def clear_reloadable_connections!
- synchronize do
- @reserved_connections.clear
- @connections.each do |conn|
- checkin conn
- conn.disconnect! if conn.requires_reloading?
- end
- @connections.delete_if(&:requires_reloading?)
- @available.clear
- @connections.each do |conn|
- @available.add conn
+ # Disconnects all connections in the pool, and clears the pool.
+ #
+ # The pool first tries to gain ownership of all connections, if unable to
+ # do so within a timeout interval (default duration is
+ # +spec.config[:checkout_timeout] * 2+ seconds), the pool is forcefully
+ # disconneted wihout any regard for other connection owning threads.
+ def disconnect!
+ disconnect(false)
+ end
+
+ # Clears the cache which maps classes and re-connects connections that
+ # require reloading.
+ #
+ # Raises:
+ # - +ExclusiveConnectionTimeoutError+ if unable to gain ownership of all
+ # connections in the pool within a timeout interval (default duration is
+ # +spec.config[:checkout_timeout] * 2+ seconds).
+ def clear_reloadable_connections(raise_on_acquisition_timeout = true)
+ num_new_conns_required = 0
+
+ with_exclusively_acquired_all_connections(raise_on_acquisition_timeout) do
+ synchronize do
+ @connections.each do |conn|
+ checkin conn
+ conn.disconnect! if conn.requires_reloading?
+ end
+ @connections.delete_if(&:requires_reloading?)
+
+ @available.clear
+
+ if @connections.size < @size
+ # because of the pruning done by this method, we might be running
+ # low on connections, while threads stuck in queue are helpless
+ # (not being able to establish new connections for themselves),
+ # see also more detailed explanation in +remove+
+ num_new_conns_required = num_waiting_in_queue - @connections.size
+ end
+
+ @connections.each do |conn|
+ @available.add conn
+ end
end
end
+
+ bulk_make_new_connections(num_new_conns_required) if num_new_conns_required > 0
+ end
+
+ # Clears the cache which maps classes and re-connects connections that
+ # require reloading.
+ #
+ # The pool first tries to gain ownership of all connections, if unable to
+ # do so within a timeout interval (default duration is
+ # +spec.config[:checkout_timeout] * 2+ seconds), the pool forcefully
+ # clears the cache and reloads connections without any regard for other
+ # connection owning threads.
+ def clear_reloadable_connections!
+ clear_reloadable_connections(false)
end
# Check-out a database connection from the pool, indicating that you want
@@ -340,12 +495,8 @@ module ActiveRecord
#
# Raises:
# - ConnectionTimeoutError: no connection can be obtained from the pool.
- def checkout
- synchronize do
- conn = acquire_connection
- conn.lease
- checkout_and_verify(conn)
- end
+ def checkout(checkout_timeout = @checkout_timeout)
+ checkout_and_verify(acquire_connection(checkout_timeout))
end
# Check-in a database connection back into the pool, indicating that you
@@ -355,14 +506,12 @@ module ActiveRecord
# calling +checkout+ on this pool.
def checkin(conn)
synchronize do
- owner = conn.owner
+ remove_connection_from_thread_cache conn
conn.run_callbacks :checkin do
conn.expire
end
- release conn, owner
-
@available.add conn
end
end
@@ -370,14 +519,32 @@ module ActiveRecord
# Remove a connection from the connection pool. The connection will
# remain open and active but will no longer be managed by this pool.
def remove(conn)
+ needs_new_connection = false
+
synchronize do
+ remove_connection_from_thread_cache conn
+
@connections.delete conn
@available.delete conn
- release conn, conn.owner
-
- @available.add checkout_new_connection if @available.any_waiting?
+ # @available.any_waiting? => true means that prior to removing this
+ # conn, the pool was at its max size (@connections.size == @size)
+ # this would mean that any threads stuck waiting in the queue wouldn't
+ # know they could checkout_new_connection, so let's do it for them.
+ # Because condition-wait loop is encapsulated in the Queue class
+ # (that in turn is oblivious to ConnectionPool implementation), threads
+ # that are "stuck" there are helpless, they have no way of creating
+ # new connections and are completely reliant on us feeding available
+ # connections into the Queue.
+ needs_new_connection = @available.any_waiting?
end
+
+ # This is intentionally done outside of the synchronized section as we
+ # would like not to hold the main mutex while checking out new connections,
+ # thus there is some chance that needs_new_connection information is now
+ # stale, we can live with that (bulk_make_new_connections will make
+ # sure not to exceed the pool's @size limit).
+ bulk_make_new_connections(1) if needs_new_connection
end
# Recover lost connections for the pool. A lost connection can occur if
@@ -402,7 +569,118 @@ module ActiveRecord
end
end
+ def num_waiting_in_queue # :nodoc:
+ @available.num_waiting
+ end
+
private
+ #--
+ # this is unfortunately not concurrent
+ def bulk_make_new_connections(num_new_conns_needed)
+ num_new_conns_needed.times do
+ # try_to_checkout_new_connection will not exceed pool's @size limit
+ if new_conn = try_to_checkout_new_connection
+ # make the new_conn available to the starving threads stuck @available Queue
+ checkin(new_conn)
+ end
+ end
+ end
+
+ #--
+ # From the discussion on Github:
+ # https://github.com/rails/rails/pull/14938#commitcomment-6601951
+ # This hook-in method allows for easier monkey-patching fixes needed by
+ # JRuby users that use Fibers.
+ def connection_cache_key(thread)
+ thread
+ end
+
+ # Take control of all existing connections so a "group" action such as
+ # reload/disconnect can be performed safely. It is no longer enough to
+ # wrap it in +synchronize+ because some pool's actions are allowed
+ # to be performed outside of the main +synchronize+ block.
+ def with_exclusively_acquired_all_connections(raise_on_acquisition_timeout = true)
+ with_new_connections_blocked do
+ attempt_to_checkout_all_existing_connections(raise_on_acquisition_timeout)
+ yield
+ end
+ end
+
+ def attempt_to_checkout_all_existing_connections(raise_on_acquisition_timeout = true)
+ collected_conns = synchronize do
+ # account for our own connections
+ @connections.select {|conn| conn.owner == Thread.current}
+ end
+
+ newly_checked_out = []
+ timeout_time = Time.now + (@checkout_timeout * 2)
+
+ @available.with_a_bias_for(Thread.current) do
+ while true
+ synchronize do
+ return if collected_conns.size == @connections.size && @now_connecting == 0
+ remaining_timeout = timeout_time - Time.now
+ remaining_timeout = 0 if remaining_timeout < 0
+ conn = checkout_for_exclusive_access(remaining_timeout)
+ collected_conns << conn
+ newly_checked_out << conn
+ end
+ end
+ end
+ rescue ExclusiveConnectionTimeoutError
+ # `raise_on_acquisition_timeout == false` means we are directed to ignore any
+ # timeouts and are expected to just give up: we've obtained as many connections
+ # as possible, note that in a case like that we don't return any of the
+ # `newly_checked_out` connections.
+
+ if raise_on_acquisition_timeout
+ release_newly_checked_out = true
+ raise
+ end
+ rescue Exception # if something else went wrong
+ # this can't be a "naked" rescue, because we have should return conns
+ # even for non-StandardErrors
+ release_newly_checked_out = true
+ raise
+ ensure
+ if release_newly_checked_out && newly_checked_out
+ # releasing only those conns that were checked out in this method, conns
+ # checked outside this method (before it was called) are not for us to release
+ newly_checked_out.each {|conn| checkin(conn)}
+ end
+ end
+
+ #--
+ # Must be called in a synchronize block.
+ def checkout_for_exclusive_access(checkout_timeout)
+ checkout(checkout_timeout)
+ rescue ConnectionTimeoutError
+ # this block can't be easily moved into attempt_to_checkout_all_existing_connections's
+ # rescue block, because doing so would put it outside of synchronize section, without
+ # being in a critical section thread_report might become inaccurate
+ msg = "could not obtain ownership of all database connections in #{checkout_timeout} seconds"
+
+ thread_report = []
+ @connections.each do |conn|
+ unless conn.owner == Thread.current
+ thread_report << "#{conn} is owned by #{conn.owner}"
+ end
+ end
+
+ msg << " (#{thread_report.join(', ')})" if thread_report.any?
+
+ raise ExclusiveConnectionTimeoutError, msg
+ end
+
+ def with_new_connections_blocked
+ previous_value = nil
+ synchronize do
+ previous_value, @new_cons_enabled = @new_cons_enabled, false
+ end
+ yield
+ ensure
+ synchronize { @new_cons_enabled = previous_value }
+ end
# Acquire a connection by one of 1) immediately removing one
# from the queue of available connections, 2) creating a new
@@ -411,24 +689,31 @@ module ActiveRecord
#
# Raises:
# - ConnectionTimeoutError if a connection could not be acquired
- def acquire_connection
- if conn = @available.poll
+ #
+ #--
+ # Implementation detail: the connection returned by +acquire_connection+
+ # will already be "+connection.lease+ -ed" to the current thread.
+ def acquire_connection(checkout_timeout)
+ # NOTE: we rely on `@available.poll` and `try_to_checkout_new_connection` to
+ # `conn.lease` the returned connection (and to do this in a `synchronized`
+ # section), this is not the cleanest implementation, as ideally we would
+ # `synchronize { conn.lease }` in this method, but by leaving it to `@available.poll`
+ # and `try_to_checkout_new_connection` we can piggyback on `synchronize` sections
+ # of the said methods and avoid an additional `synchronize` overhead.
+ if conn = @available.poll || try_to_checkout_new_connection
conn
- elsif @connections.size < @size
- checkout_new_connection
else
reap
- @available.poll(@checkout_timeout)
+ @available.poll(checkout_timeout)
end
end
- def release(conn, owner)
- thread_id = owner.object_id
-
- if @reserved_connections[thread_id] == conn
- @reserved_connections.delete thread_id
- end
+ #--
+ # if owner_thread param is omitted, this must be called in synchronize block
+ def remove_connection_from_thread_cache(conn, owner_thread = conn.owner)
+ @thread_cached_conns.delete_pair(connection_cache_key(owner_thread), conn)
end
+ alias_method :release, :remove_connection_from_thread_cache
def new_connection
Base.send(spec.adapter_method, spec.config).tap do |conn|
@@ -436,17 +721,46 @@ module ActiveRecord
end
end
- def current_connection_id #:nodoc:
- Base.connection_id ||= Thread.current.object_id
+ # If the pool is not at a +@size+ limit, establish new connection. Connecting
+ # to the DB is done outside main synchronized section.
+ #--
+ # Implementation constraint: a newly established connection returned by this
+ # method must be in the +.leased+ state.
+ def try_to_checkout_new_connection
+ # first in synchronized section check if establishing new conns is allowed
+ # and increment @now_connecting, to prevent overstepping this pool's @size
+ # constraint
+ do_checkout = synchronize do
+ if @new_cons_enabled && (@connections.size + @now_connecting) < @size
+ @now_connecting += 1
+ end
+ end
+ if do_checkout
+ begin
+ # if successfully incremented @now_connecting establish new connection
+ # outside of synchronized section
+ conn = checkout_new_connection
+ ensure
+ synchronize do
+ if conn
+ adopt_connection(conn)
+ # returned conn needs to be already leased
+ conn.lease
+ end
+ @now_connecting -= 1
+ end
+ end
+ end
+ end
+
+ def adopt_connection(conn)
+ conn.pool = self
+ @connections << conn
end
def checkout_new_connection
raise ConnectionNotEstablished unless @automatic_reconnect
-
- c = new_connection
- c.pool = self
- @connections << c
- c
+ new_connection
end
def checkout_and_verify(c)
diff --git a/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb b/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb
index 0705c22a8c..60655b673c 100644
--- a/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb
+++ b/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb
@@ -7,7 +7,6 @@ require 'active_record/connection_adapters/schema_cache'
require 'active_record/connection_adapters/sql_type_metadata'
require 'active_record/connection_adapters/abstract/schema_dumper'
require 'active_record/connection_adapters/abstract/schema_creation'
-require 'monitor'
require 'arel/collectors/bind'
require 'arel/collectors/sql_string'
@@ -70,7 +69,6 @@ module ActiveRecord
include DatabaseLimits
include QueryCache
include ActiveSupport::Callbacks
- include MonitorMixin
include ColumnDumper
SIMPLE_INT = /\A\d+\z/
@@ -141,12 +139,20 @@ module ActiveRecord
SchemaCreation.new self
end
+ # this method must only be called while holding connection pool's mutex
def lease
- synchronize do
- unless in_use?
- @owner = Thread.current
+ if in_use?
+ msg = 'Cannot lease connection, '
+ if @owner == Thread.current
+ msg << 'it is already leased by the current thread.'
+ else
+ msg << "it is already in use by a different thread: #{@owner}. " <<
+ "Current thread: #{Thread.current}."
end
+ raise ActiveRecordError, msg
end
+
+ @owner = Thread.current
end
def schema_cache=(cache)
@@ -154,6 +160,7 @@ module ActiveRecord
@schema_cache = cache
end
+ # this method must only be called while holding connection pool's mutex
def expire
@owner = nil
end
diff --git a/activerecord/lib/active_record/connection_handling.rb b/activerecord/lib/active_record/connection_handling.rb
index 24f5849e45..d6b661ff76 100644
--- a/activerecord/lib/active_record/connection_handling.rb
+++ b/activerecord/lib/active_record/connection_handling.rb
@@ -88,7 +88,7 @@ module ActiveRecord
end
def connection_id
- ActiveRecord::RuntimeRegistry.connection_id
+ ActiveRecord::RuntimeRegistry.connection_id ||= Thread.current.object_id
end
def connection_id=(connection_id)
diff --git a/activerecord/test/cases/connection_adapters/adapter_leasing_test.rb b/activerecord/test/cases/connection_adapters/adapter_leasing_test.rb
index 662e19f35e..580568c8ac 100644
--- a/activerecord/test/cases/connection_adapters/adapter_leasing_test.rb
+++ b/activerecord/test/cases/connection_adapters/adapter_leasing_test.rb
@@ -6,7 +6,7 @@ module ActiveRecord
class Pool < ConnectionPool
def insert_connection_for_test!(c)
synchronize do
- @connections << c
+ adopt_connection(c)
@available.add c
end
end
@@ -24,7 +24,9 @@ module ActiveRecord
def test_lease_twice
assert @adapter.lease, 'should lease adapter'
- assert_not @adapter.lease, 'should not lease adapter'
+ assert_raises(ActiveRecordError) do
+ @adapter.lease
+ end
end
def test_expire_mutates_in_use
diff --git a/activerecord/test/cases/connection_pool_test.rb b/activerecord/test/cases/connection_pool_test.rb
index f5928814a3..90b6fb55ef 100644
--- a/activerecord/test/cases/connection_pool_test.rb
+++ b/activerecord/test/cases/connection_pool_test.rb
@@ -100,7 +100,7 @@ module ActiveRecord
t = Thread.new { @pool.checkout }
# make sure our thread is in the timeout section
- Thread.pass until t.status == "sleep"
+ Thread.pass until @pool.num_waiting_in_queue == 1
connection = cs.first
connection.close
@@ -112,7 +112,7 @@ module ActiveRecord
t = Thread.new { @pool.checkout }
# make sure our thread is in the timeout section
- Thread.pass until t.status == "sleep"
+ Thread.pass until @pool.num_waiting_in_queue == 1
connection = cs.first
@pool.remove connection
@@ -234,7 +234,7 @@ module ActiveRecord
mutex.synchronize { errors << e }
end
}
- Thread.pass until t.status == "sleep"
+ Thread.pass until @pool.num_waiting_in_queue == i
t
end
@@ -271,7 +271,7 @@ module ActiveRecord
mutex.synchronize { errors << e }
end
}
- Thread.pass until t.status == "sleep"
+ Thread.pass until @pool.num_waiting_in_queue == i
t
end
@@ -356,6 +356,168 @@ module ActiveRecord
pool.checkin connection
end
+
+ def test_concurrent_connection_establishment
+ all_threads_in_new_connection = ActiveSupport::Concurrency::Latch.new(@pool.size)
+ all_go = ActiveSupport::Concurrency::Latch.new
+
+ @pool.singleton_class.class_eval do
+ define_method(:new_connection) do
+ all_threads_in_new_connection.release
+ all_go.await
+ super()
+ end
+ end
+
+ connecting_threads = []
+ @pool.size.times do
+ connecting_threads << Thread.new { @pool.checkout }
+ end
+
+ begin
+ Timeout.timeout(5) do
+ # the kernel of the whole test is here, everything else is just scaffolding,
+ # this latch will not be released unless conn. pool allows for concurrent
+ # connection creation
+ all_threads_in_new_connection.await
+ end
+ rescue Timeout::Error
+ flunk 'pool unable to establish connections concurrently or implementation has ' <<
+ 'changed, this test then needs to patch a different :new_connection method'
+ ensure
+ # clean up the threads
+ all_go.release
+ connecting_threads.map(&:join)
+ end
+ end
+
+ def test_non_bang_disconnect_and_clear_reloadable_connections_throw_exception_if_threads_dont_return_their_conns
+ @pool.checkout_timeout = 0.001 # no need to delay test suite by waiting the whole full default timeout
+ [:disconnect, :clear_reloadable_connections].each do |group_action_method|
+ @pool.with_connection do |connection|
+ assert_raises(ExclusiveConnectionTimeoutError) do
+ Thread.new { @pool.send(group_action_method) }.join
+ end
+ end
+ end
+ end
+
+ def test_disconnect_and_clear_reloadable_connections_attempt_to_wait_for_threads_to_return_their_conns
+ [:disconnect, :disconnect!, :clear_reloadable_connections, :clear_reloadable_connections!].each do |group_action_method|
+ begin
+ thread = timed_join_result = nil
+ @pool.with_connection do |connection|
+ thread = Thread.new { @pool.send(group_action_method) }
+
+ # give the other `thread` some time to get stuck in `group_action_method`
+ timed_join_result = thread.join(0.3)
+ # thread.join # => `nil` means the other thread hasn't finished running and is still waiting for us to
+ # release our connection
+ assert_nil timed_join_result
+
+ # assert that since this is within default timeout our connection hasn't been forcefully taken away from us
+ assert @pool.active_connection?
+ end
+ ensure
+ thread.join if thread && !timed_join_result # clean up the other thread
+ end
+ end
+ end
+
+ def test_bang_versions_of_disconnect_and_clear_reloadable_connections_if_unable_to_aquire_all_connections_proceed_anyway
+ @pool.checkout_timeout = 0.001 # no need to delay test suite by waiting the whole full default timeout
+ [:disconnect!, :clear_reloadable_connections!].each do |group_action_method|
+ @pool.with_connection do |connection|
+ Thread.new { @pool.send(group_action_method) }.join
+ # assert connection has been forcefully taken away from us
+ assert_not @pool.active_connection?
+ end
+ end
+ end
+
+ def test_disconnect_and_clear_reloadable_connections_are_able_to_preempt_other_waiting_threads
+ with_single_connection_pool do |pool|
+ [:disconnect, :disconnect!, :clear_reloadable_connections, :clear_reloadable_connections!].each do |group_action_method|
+ conn = pool.connection # drain the only available connection
+ second_thread_done = ActiveSupport::Concurrency::Latch.new
+
+ # create a first_thread and let it get into the FIFO queue first
+ first_thread = Thread.new do
+ pool.with_connection { second_thread_done.await }
+ end
+
+ # wait for first_thread to get in queue
+ Thread.pass until pool.num_waiting_in_queue == 1
+
+ # create a different, later thread, that will attempt to do a "group action",
+ # but because of the group action semantics it should be able to preempt the
+ # first_thread when a connection is made available
+ second_thread = Thread.new do
+ pool.send(group_action_method)
+ second_thread_done.release
+ end
+
+ # wait for second_thread to get in queue
+ Thread.pass until pool.num_waiting_in_queue == 2
+
+ # return the only available connection
+ pool.checkin(conn)
+
+ # if the second_thread is not able to preempt the first_thread,
+ # they will temporarily (until either of them timeouts with ConnectionTimeoutError)
+ # deadlock and a join(2) timeout will be reached
+ failed = true unless second_thread.join(2)
+
+ #--- post test clean up start
+ second_thread_done.release if failed
+
+ # after `pool.disconnect()` the first thread will be left stuck in queue, no need to wait for
+ # it to timeout with ConnectionTimeoutError
+ if (group_action_method == :disconnect || group_action_method == :disconnect!) && pool.num_waiting_in_queue > 0
+ pool.with_connection {} # create a new connection in case there are threads still stuck in a queue
+ end
+
+ first_thread.join
+ second_thread.join
+ #--- post test clean up end
+
+ flunk "#{group_action_method} is not able to preempt other waiting threads" if failed
+ end
+ end
+ end
+
+ def test_clear_reloadable_connections_creates_new_connections_for_waiting_threads_if_necessary
+ with_single_connection_pool do |pool|
+ conn = pool.connection # drain the only available connection
+ def conn.requires_reloading? # make sure it gets removed from the pool by clear_reloadable_connections
+ true
+ end
+
+ stuck_thread = Thread.new do
+ pool.with_connection {}
+ end
+
+ # wait for stuck_thread to get in queue
+ Thread.pass until pool.num_waiting_in_queue == 1
+
+ pool.clear_reloadable_connections
+
+ unless stuck_thread.join(2)
+ flunk 'clear_reloadable_connections must not let other connection waiting threads get stuck in queue'
+ end
+
+ assert_equal 0, pool.num_waiting_in_queue
+ end
+ end
+
+ private
+ def with_single_connection_pool
+ one_conn_spec = ActiveRecord::Base.connection_pool.spec.dup
+ one_conn_spec.config[:pool] = 1 # this is safe to do, because .dupped ConnectionSpecification also auto-dups its config
+ yield(pool = ConnectionPool.new(one_conn_spec))
+ ensure
+ pool.disconnect! if pool
+ end
end
end
end