diff options
Diffstat (limited to 'activerecord/lib')
3 files changed, 407 insertions, 86 deletions
diff --git a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb index 40c83b26af..5bfb4d779c 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb @@ -9,6 +9,12 @@ module ActiveRecord class ConnectionTimeoutError < ConnectionNotEstablished end + # Raised when a pool was unable to get ahold of all its connections + # to perform a "group" action such as +ConnectionPool#disconnect!+ + # or +ConnectionPool#clear_reloadable_connections!+. + class ExclusiveConnectionTimeoutError < ConnectionTimeoutError + end + module ConnectionAdapters # Connection pool base class for managing Active Record database # connections. @@ -62,6 +68,15 @@ module ActiveRecord # connection at the end of a thread or a thread dies unexpectedly. # Regardless of this setting, the Reaper will be invoked before every # blocking wait. (Default nil, which means don't schedule the Reaper). + # + #-- + # Synchronization policy: + # * all public methods can be called outside +synchronize+ + # * access to these i-vars needs to be in +synchronize+: + # * @connections + # * @now_connecting + # * private methods that require being called in a +synchronize+ blocks + # are now explicitly documented class ConnectionPool # Threadsafe, fair, FIFO queue. Meant to be used by ConnectionPool # with which it shares a Monitor. But could be a generic Queue. @@ -128,17 +143,15 @@ module ActiveRecord # - ConnectionTimeoutError if +timeout+ is given and no element # becomes available within +timeout+ seconds, def poll(timeout = nil) - synchronize do - if timeout - no_wait_poll || wait_poll(timeout) - else - no_wait_poll - end - end + synchronize { internal_poll(timeout) } end private + def internal_poll(timeout) + no_wait_poll || (timeout && wait_poll(timeout)) + end + def synchronize(&block) @lock.synchronize(&block) end @@ -192,6 +205,80 @@ module ActiveRecord end end + # Adds the ability to turn a basic fair FIFO queue into one + # biased to some thread. + module BiasableQueue # :nodoc: + class BiasedConditionVariable # :nodoc: + # semantics of condition variables guarantee that +broadcast+, +broadcast_on_biased+, + # +signal+ and +wait+ methods are only called while holding a lock + def initialize(lock, other_cond, preferred_thread) + @real_cond = lock.new_cond + @other_cond = other_cond + @preferred_thread = preferred_thread + @num_waiting_on_real_cond = 0 + end + + def broadcast + broadcast_on_biased + @other_cond.broadcast + end + + def broadcast_on_biased + @num_waiting_on_real_cond = 0 + @real_cond.broadcast + end + + def signal + if @num_waiting_on_real_cond > 0 + @num_waiting_on_real_cond -= 1 + @real_cond + else + @other_cond + end.signal + end + + def wait(timeout) + if Thread.current == @preferred_thread + @num_waiting_on_real_cond += 1 + @real_cond + else + @other_cond + end.wait(timeout) + end + end + + def with_a_bias_for(thread) + previous_cond = nil + new_cond = nil + synchronize do + previous_cond = @cond + @cond = new_cond = BiasedConditionVariable.new(@lock, @cond, thread) + end + yield + ensure + synchronize do + @cond = previous_cond if previous_cond + new_cond.broadcast_on_biased if new_cond # wake up any remaining sleepers + end + end + end + + # Connections must be leased while holding the main pool mutex. This is + # an internal subclass that also +.leases+ returned connections while + # still in queue's critical section (queue synchronizes with the same + # +@lock+ as the main pool) so that a returned connection is already + # leased and there is no need to re-enter synchronized block. + class ConnectionLeasingQueue < Queue # :nodoc: + include BiasableQueue + + private + def internal_poll(timeout) + conn = super + conn.lease if conn + conn + end + end + # Every +frequency+ seconds, the reaper will call +reap+ on +pool+. # A reaper instantiated with a nil frequency will never reap the # connection pool. @@ -240,56 +327,75 @@ module ActiveRecord # default max pool size to 5 @size = (spec.config[:pool] && spec.config[:pool].to_i) || 5 - # The cache of reserved connections mapped to threads - @reserved_connections = ThreadSafe::Cache.new(:initial_capacity => @size) + # The cache of threads mapped to reserved connections, the sole purpose + # of the cache is to speed-up +connection+ method, it is not the authoritative + # registry of which thread owns which connection, that is tracked by + # +connection.owner+ attr on each +connection+ instance. + # The invariant works like this: if there is mapping of +thread => conn+, + # then that +thread+ does indeed own that +conn+, however an absence of a such + # mapping does not mean that the +thread+ doesn't own the said connection, in + # that case +conn.owner+ attr should be consulted. + # Access and modification of +@thread_cached_conns+ does not require + # synchronization. + @thread_cached_conns = ThreadSafe::Cache.new(:initial_capacity => @size) @connections = [] @automatic_reconnect = true - @available = Queue.new self + # Connection pool allows for concurrent (outside the main `synchronize` section) + # establishment of new connections. This variable tracks the number of threads + # currently in the process of independently establishing connections to the DB. + @now_connecting = 0 + + # A boolean toggle that allows/disallows new connections. + @new_cons_enabled = true + + @available = ConnectionLeasingQueue.new self end # Retrieve the connection associated with the current thread, or call # #checkout to obtain one if necessary. # # #connection can be called any number of times; the connection is - # held in a hash keyed by the thread id. + # held in a cache keyed by a thread. def connection - # this is correctly done double-checked locking - # (ThreadSafe::Cache's lookups have volatile semantics) - @reserved_connections[current_connection_id] || synchronize do - @reserved_connections[current_connection_id] ||= checkout - end + @thread_cached_conns[connection_cache_key(Thread.current)] ||= checkout end # Is there an open connection that is being used for the current thread? + # + # This method only works for connections that have been abtained through + # #connection or #with_connection methods, connections obtained through + # #checkout will not be detected by #active_connection? def active_connection? - synchronize do - @reserved_connections.fetch(current_connection_id) { - return false - }.in_use? - end + @thread_cached_conns[connection_cache_key(Thread.current)] end # Signal that the thread is finished with the current connection. # #release_connection releases the connection-thread association # and returns the connection to the pool. - def release_connection(with_id = current_connection_id) - synchronize do - conn = @reserved_connections.delete(with_id) - checkin conn if conn + # + # This method only works for connections that have been obtained through + # #connection or #with_connection methods, connections obtained through + # #checkout will not be automatically released. + def release_connection(owner_thread = Thread.current) + if conn = @thread_cached_conns.delete(connection_cache_key(owner_thread)) + checkin conn end end - # If a connection already exists yield it to the block. If no connection + # If a connection obtained through #connection or #with_connection methods + # already exists yield it to the block. If no such connection # exists checkout a connection, yield it to the block, and checkin the # connection when finished. def with_connection - connection_id = current_connection_id - fresh_connection = true unless active_connection? - yield connection + unless conn = @thread_cached_conns[connection_cache_key(Thread.current)] + conn = connection + fresh_connection = true + end + yield conn ensure - release_connection(connection_id) if fresh_connection + release_connection if fresh_connection end # Returns true if a connection has already been opened. @@ -298,32 +404,81 @@ module ActiveRecord end # Disconnects all connections in the pool, and clears the pool. - def disconnect! - synchronize do - @reserved_connections.clear - @connections.each do |conn| - checkin conn - conn.disconnect! + # + # Raises: + # - +ExclusiveConnectionTimeoutError+ if unable to gain ownership of all + # connections in the pool within a timeout interval (default duration is + # +spec.config[:checkout_timeout] * 2+ seconds). + def disconnect(raise_on_acquisition_timeout = true) + with_exclusively_acquired_all_connections(raise_on_acquisition_timeout) do + synchronize do + @connections.each do |conn| + checkin conn + conn.disconnect! + end + @connections = [] + @available.clear end - @connections = [] - @available.clear end end - # Clears the cache which maps classes. - def clear_reloadable_connections! - synchronize do - @reserved_connections.clear - @connections.each do |conn| - checkin conn - conn.disconnect! if conn.requires_reloading? - end - @connections.delete_if(&:requires_reloading?) - @available.clear - @connections.each do |conn| - @available.add conn + # Disconnects all connections in the pool, and clears the pool. + # + # The pool first tries to gain ownership of all connections, if unable to + # do so within a timeout interval (default duration is + # +spec.config[:checkout_timeout] * 2+ seconds), the pool is forcefully + # disconneted wihout any regard for other connection owning threads. + def disconnect! + disconnect(false) + end + + # Clears the cache which maps classes and re-connects connections that + # require reloading. + # + # Raises: + # - +ExclusiveConnectionTimeoutError+ if unable to gain ownership of all + # connections in the pool within a timeout interval (default duration is + # +spec.config[:checkout_timeout] * 2+ seconds). + def clear_reloadable_connections(raise_on_acquisition_timeout = true) + num_new_conns_required = 0 + + with_exclusively_acquired_all_connections(raise_on_acquisition_timeout) do + synchronize do + @connections.each do |conn| + checkin conn + conn.disconnect! if conn.requires_reloading? + end + @connections.delete_if(&:requires_reloading?) + + @available.clear + + if @connections.size < @size + # because of the pruning done by this method, we might be running + # low on connections, while threads stuck in queue are helpless + # (not being able to establish new connections for themselves), + # see also more detailed explanation in +remove+ + num_new_conns_required = num_waiting_in_queue - @connections.size + end + + @connections.each do |conn| + @available.add conn + end end end + + bulk_make_new_connections(num_new_conns_required) if num_new_conns_required > 0 + end + + # Clears the cache which maps classes and re-connects connections that + # require reloading. + # + # The pool first tries to gain ownership of all connections, if unable to + # do so within a timeout interval (default duration is + # +spec.config[:checkout_timeout] * 2+ seconds), the pool forcefully + # clears the cache and reloads connections without any regard for other + # connection owning threads. + def clear_reloadable_connections! + clear_reloadable_connections(false) end # Check-out a database connection from the pool, indicating that you want @@ -340,12 +495,8 @@ module ActiveRecord # # Raises: # - ConnectionTimeoutError: no connection can be obtained from the pool. - def checkout - synchronize do - conn = acquire_connection - conn.lease - checkout_and_verify(conn) - end + def checkout(checkout_timeout = @checkout_timeout) + checkout_and_verify(acquire_connection(checkout_timeout)) end # Check-in a database connection back into the pool, indicating that you @@ -355,14 +506,12 @@ module ActiveRecord # calling +checkout+ on this pool. def checkin(conn) synchronize do - owner = conn.owner + remove_connection_from_thread_cache conn conn.run_callbacks :checkin do conn.expire end - release conn, owner - @available.add conn end end @@ -370,14 +519,32 @@ module ActiveRecord # Remove a connection from the connection pool. The connection will # remain open and active but will no longer be managed by this pool. def remove(conn) + needs_new_connection = false + synchronize do + remove_connection_from_thread_cache conn + @connections.delete conn @available.delete conn - release conn, conn.owner - - @available.add checkout_new_connection if @available.any_waiting? + # @available.any_waiting? => true means that prior to removing this + # conn, the pool was at its max size (@connections.size == @size) + # this would mean that any threads stuck waiting in the queue wouldn't + # know they could checkout_new_connection, so let's do it for them. + # Because condition-wait loop is encapsulated in the Queue class + # (that in turn is oblivious to ConnectionPool implementation), threads + # that are "stuck" there are helpless, they have no way of creating + # new connections and are completely reliant on us feeding available + # connections into the Queue. + needs_new_connection = @available.any_waiting? end + + # This is intentionally done outside of the synchronized section as we + # would like not to hold the main mutex while checking out new connections, + # thus there is some chance that needs_new_connection information is now + # stale, we can live with that (bulk_make_new_connections will make + # sure not to exceed the pool's @size limit). + bulk_make_new_connections(1) if needs_new_connection end # Recover lost connections for the pool. A lost connection can occur if @@ -402,7 +569,118 @@ module ActiveRecord end end + def num_waiting_in_queue # :nodoc: + @available.num_waiting + end + private + #-- + # this is unfortunately not concurrent + def bulk_make_new_connections(num_new_conns_needed) + num_new_conns_needed.times do + # try_to_checkout_new_connection will not exceed pool's @size limit + if new_conn = try_to_checkout_new_connection + # make the new_conn available to the starving threads stuck @available Queue + checkin(new_conn) + end + end + end + + #-- + # From the discussion on Github: + # https://github.com/rails/rails/pull/14938#commitcomment-6601951 + # This hook-in method allows for easier monkey-patching fixes needed by + # JRuby users that use Fibers. + def connection_cache_key(thread) + thread + end + + # Take control of all existing connections so a "group" action such as + # reload/disconnect can be performed safely. It is no longer enough to + # wrap it in +synchronize+ because some pool's actions are allowed + # to be performed outside of the main +synchronize+ block. + def with_exclusively_acquired_all_connections(raise_on_acquisition_timeout = true) + with_new_connections_blocked do + attempt_to_checkout_all_existing_connections(raise_on_acquisition_timeout) + yield + end + end + + def attempt_to_checkout_all_existing_connections(raise_on_acquisition_timeout = true) + collected_conns = synchronize do + # account for our own connections + @connections.select {|conn| conn.owner == Thread.current} + end + + newly_checked_out = [] + timeout_time = Time.now + (@checkout_timeout * 2) + + @available.with_a_bias_for(Thread.current) do + while true + synchronize do + return if collected_conns.size == @connections.size && @now_connecting == 0 + remaining_timeout = timeout_time - Time.now + remaining_timeout = 0 if remaining_timeout < 0 + conn = checkout_for_exclusive_access(remaining_timeout) + collected_conns << conn + newly_checked_out << conn + end + end + end + rescue ExclusiveConnectionTimeoutError + # `raise_on_acquisition_timeout == false` means we are directed to ignore any + # timeouts and are expected to just give up: we've obtained as many connections + # as possible, note that in a case like that we don't return any of the + # `newly_checked_out` connections. + + if raise_on_acquisition_timeout + release_newly_checked_out = true + raise + end + rescue Exception # if something else went wrong + # this can't be a "naked" rescue, because we have should return conns + # even for non-StandardErrors + release_newly_checked_out = true + raise + ensure + if release_newly_checked_out && newly_checked_out + # releasing only those conns that were checked out in this method, conns + # checked outside this method (before it was called) are not for us to release + newly_checked_out.each {|conn| checkin(conn)} + end + end + + #-- + # Must be called in a synchronize block. + def checkout_for_exclusive_access(checkout_timeout) + checkout(checkout_timeout) + rescue ConnectionTimeoutError + # this block can't be easily moved into attempt_to_checkout_all_existing_connections's + # rescue block, because doing so would put it outside of synchronize section, without + # being in a critical section thread_report might become inaccurate + msg = "could not obtain ownership of all database connections in #{checkout_timeout} seconds" + + thread_report = [] + @connections.each do |conn| + unless conn.owner == Thread.current + thread_report << "#{conn} is owned by #{conn.owner}" + end + end + + msg << " (#{thread_report.join(', ')})" if thread_report.any? + + raise ExclusiveConnectionTimeoutError, msg + end + + def with_new_connections_blocked + previous_value = nil + synchronize do + previous_value, @new_cons_enabled = @new_cons_enabled, false + end + yield + ensure + synchronize { @new_cons_enabled = previous_value } + end # Acquire a connection by one of 1) immediately removing one # from the queue of available connections, 2) creating a new @@ -411,24 +689,31 @@ module ActiveRecord # # Raises: # - ConnectionTimeoutError if a connection could not be acquired - def acquire_connection - if conn = @available.poll + # + #-- + # Implementation detail: the connection returned by +acquire_connection+ + # will already be "+connection.lease+ -ed" to the current thread. + def acquire_connection(checkout_timeout) + # NOTE: we rely on `@available.poll` and `try_to_checkout_new_connection` to + # `conn.lease` the returned connection (and to do this in a `synchronized` + # section), this is not the cleanest implementation, as ideally we would + # `synchronize { conn.lease }` in this method, but by leaving it to `@available.poll` + # and `try_to_checkout_new_connection` we can piggyback on `synchronize` sections + # of the said methods and avoid an additional `synchronize` overhead. + if conn = @available.poll || try_to_checkout_new_connection conn - elsif @connections.size < @size - checkout_new_connection else reap - @available.poll(@checkout_timeout) + @available.poll(checkout_timeout) end end - def release(conn, owner) - thread_id = owner.object_id - - if @reserved_connections[thread_id] == conn - @reserved_connections.delete thread_id - end + #-- + # if owner_thread param is omitted, this must be called in synchronize block + def remove_connection_from_thread_cache(conn, owner_thread = conn.owner) + @thread_cached_conns.delete_pair(connection_cache_key(owner_thread), conn) end + alias_method :release, :remove_connection_from_thread_cache def new_connection Base.send(spec.adapter_method, spec.config).tap do |conn| @@ -436,17 +721,46 @@ module ActiveRecord end end - def current_connection_id #:nodoc: - Base.connection_id ||= Thread.current.object_id + # If the pool is not at a +@size+ limit, establish new connection. Connecting + # to the DB is done outside main synchronized section. + #-- + # Implementation constraint: a newly established connection returned by this + # method must be in the +.leased+ state. + def try_to_checkout_new_connection + # first in synchronized section check if establishing new conns is allowed + # and increment @now_connecting, to prevent overstepping this pool's @size + # constraint + do_checkout = synchronize do + if @new_cons_enabled && (@connections.size + @now_connecting) < @size + @now_connecting += 1 + end + end + if do_checkout + begin + # if successfully incremented @now_connecting establish new connection + # outside of synchronized section + conn = checkout_new_connection + ensure + synchronize do + if conn + adopt_connection(conn) + # returned conn needs to be already leased + conn.lease + end + @now_connecting -= 1 + end + end + end + end + + def adopt_connection(conn) + conn.pool = self + @connections << conn end def checkout_new_connection raise ConnectionNotEstablished unless @automatic_reconnect - - c = new_connection - c.pool = self - @connections << c - c + new_connection end def checkout_and_verify(c) diff --git a/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb b/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb index 0705c22a8c..60655b673c 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb @@ -7,7 +7,6 @@ require 'active_record/connection_adapters/schema_cache' require 'active_record/connection_adapters/sql_type_metadata' require 'active_record/connection_adapters/abstract/schema_dumper' require 'active_record/connection_adapters/abstract/schema_creation' -require 'monitor' require 'arel/collectors/bind' require 'arel/collectors/sql_string' @@ -70,7 +69,6 @@ module ActiveRecord include DatabaseLimits include QueryCache include ActiveSupport::Callbacks - include MonitorMixin include ColumnDumper SIMPLE_INT = /\A\d+\z/ @@ -141,12 +139,20 @@ module ActiveRecord SchemaCreation.new self end + # this method must only be called while holding connection pool's mutex def lease - synchronize do - unless in_use? - @owner = Thread.current + if in_use? + msg = 'Cannot lease connection, ' + if @owner == Thread.current + msg << 'it is already leased by the current thread.' + else + msg << "it is already in use by a different thread: #{@owner}. " << + "Current thread: #{Thread.current}." end + raise ActiveRecordError, msg end + + @owner = Thread.current end def schema_cache=(cache) @@ -154,6 +160,7 @@ module ActiveRecord @schema_cache = cache end + # this method must only be called while holding connection pool's mutex def expire @owner = nil end diff --git a/activerecord/lib/active_record/connection_handling.rb b/activerecord/lib/active_record/connection_handling.rb index 24f5849e45..d6b661ff76 100644 --- a/activerecord/lib/active_record/connection_handling.rb +++ b/activerecord/lib/active_record/connection_handling.rb @@ -88,7 +88,7 @@ module ActiveRecord end def connection_id - ActiveRecord::RuntimeRegistry.connection_id + ActiveRecord::RuntimeRegistry.connection_id ||= Thread.current.object_id end def connection_id=(connection_id) |