diff options
Diffstat (limited to 'activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb')
-rw-r--r-- | activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb | 167 |
1 files changed, 123 insertions, 44 deletions
diff --git a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb index 3f2e86a98d..e977d36cb9 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true + require "thread" require "concurrent/map" require "monitor" @@ -61,15 +63,13 @@ module ActiveRecord # There are several connection-pooling-related options that you can add to # your database connection configuration: # - # * +pool+: number indicating size of connection pool (default 5) - # * +checkout_timeout+: number of seconds to block and wait for a connection - # before giving up and raising a timeout error (default 5 seconds). - # * +reaping_frequency+: frequency in seconds to periodically run the - # Reaper, which attempts to find and recover connections from dead - # threads, which can occur if a programmer forgets to close a - # connection at the end of a thread or a thread dies unexpectedly. - # Regardless of this setting, the Reaper will be invoked before every - # blocking wait. (Default +nil+, which means don't schedule the Reaper). + # * +pool+: maximum number of connections the pool may manage (default 5). + # * +idle_timeout+: number of seconds that a connection will be kept + # unused in the pool before it is automatically disconnected (default + # 300 seconds). Set this to zero to keep connections forever. + # * +checkout_timeout+: number of seconds to wait for a connection to + # become available before giving up and raising a timeout error (default + # 5 seconds). # #-- # Synchronization policy: @@ -80,11 +80,8 @@ module ActiveRecord # * private methods that require being called in a +synchronize+ blocks # are now explicitly documented class ConnectionPool - # Threadsafe, fair, FIFO queue. Meant to be used by ConnectionPool - # with which it shares a Monitor. But could be a generic Queue. - # - # The Queue in stdlib's 'thread' could replace this class except - # stdlib's doesn't support waiting with a timeout. + # Threadsafe, fair, LIFO queue. Meant to be used by ConnectionPool + # with which it shares a Monitor. class Queue def initialize(lock = Monitor.new) @lock = lock @@ -173,7 +170,7 @@ module ActiveRecord # Removes and returns the head of the queue if possible, or +nil+. def remove - @queue.shift + @queue.pop end # Remove and return the head the queue if the number of @@ -191,7 +188,9 @@ module ActiveRecord t0 = Time.now elapsed = 0 loop do - @cond.wait(timeout - elapsed) + ActiveSupport::Dependencies.interlock.permit_concurrent_loads do + @cond.wait(timeout - elapsed) + end return remove if any? @@ -268,7 +267,7 @@ module ActiveRecord # Connections must be leased while holding the main pool mutex. This is # an internal subclass that also +.leases+ returned connections while # still in queue's critical section (queue synchronizes with the same - # +@lock+ as the main pool) so that a returned connection is already + # <tt>@lock</tt> as the main pool) so that a returned connection is already # leased and there is no need to re-enter synchronized block. class ConnectionLeasingQueue < Queue # :nodoc: include BiasableQueue @@ -281,12 +280,12 @@ module ActiveRecord end end - # Every +frequency+ seconds, the reaper will call +reap+ on +pool+. - # A reaper instantiated with a +nil+ frequency will never reap the - # connection pool. + # Every +frequency+ seconds, the reaper will call +reap+ and +flush+ on + # +pool+. A reaper instantiated with a zero frequency will never reap + # the connection pool. # - # Configure the frequency by setting "reaping_frequency" in your - # database yaml file. + # Configure the frequency by setting +reaping_frequency+ in your database + # yaml file (default 60 seconds). class Reaper attr_reader :pool, :frequency @@ -296,11 +295,12 @@ module ActiveRecord end def run - return unless frequency + return unless frequency && frequency > 0 Thread.new(frequency, pool) { |t, p| loop do sleep t p.reap + p.flush end } end @@ -324,8 +324,10 @@ module ActiveRecord @spec = spec @checkout_timeout = (spec.config[:checkout_timeout] && spec.config[:checkout_timeout].to_f) || 5 - @reaper = Reaper.new(self, (spec.config[:reaping_frequency] && spec.config[:reaping_frequency].to_f)) - @reaper.run + if @idle_timeout = spec.config.fetch(:idle_timeout, 300) + @idle_timeout = @idle_timeout.to_f + @idle_timeout = nil if @idle_timeout <= 0 + end # default max pool size to 5 @size = (spec.config[:pool] && spec.config[:pool].to_i) || 5 @@ -338,7 +340,7 @@ module ActiveRecord # then that +thread+ does indeed own that +conn+. However, an absence of a such # mapping does not mean that the +thread+ doesn't own the said connection. In # that case +conn.owner+ attr should be consulted. - # Access and modification of +@thread_cached_conns+ does not require + # Access and modification of <tt>@thread_cached_conns</tt> does not require # synchronization. @thread_cached_conns = Concurrent::Map.new(initial_capacity: @size) @@ -355,6 +357,12 @@ module ActiveRecord @available = ConnectionLeasingQueue.new self @lock_thread = false + + # +reaping_frequency+ is configurable mostly for historical reasons, but it could + # also be useful if someone wants a very low +idle_timeout+. + reaping_frequency = spec.config.fetch(:reaping_frequency, 60) + @reaper = Reaper.new(self, reaping_frequency && reaping_frequency.to_f) + @reaper.run end def lock_thread=(lock_thread) @@ -447,6 +455,21 @@ module ActiveRecord disconnect(false) end + # Discards all connections in the pool (even if they're currently + # leased!), along with the pool itself. Any further interaction with the + # pool (except #spec and #schema_cache) is undefined. + # + # See AbstractAdapter#discard! + def discard! # :nodoc: + synchronize do + return if @connections.nil? # already discarded + @connections.each do |conn| + conn.discard! + end + @connections = @available = @thread_cached_conns = nil + end + end + # Clears the cache which maps classes and re-connects connections that # require reloading. # @@ -506,14 +529,16 @@ module ActiveRecord # +conn+: an AbstractAdapter object, which was obtained by earlier by # calling #checkout on this pool. def checkin(conn) - synchronize do - remove_connection_from_thread_cache conn + conn.lock.synchronize do + synchronize do + remove_connection_from_thread_cache conn - conn._run_checkin_callbacks do - conn.expire - end + conn._run_checkin_callbacks do + conn.expire + end - @available.add conn + @available.add conn + end end end @@ -570,6 +595,35 @@ module ActiveRecord end end + # Disconnect all connections that have been idle for at least + # +minimum_idle+ seconds. Connections currently checked out, or that were + # checked in less than +minimum_idle+ seconds ago, are unaffected. + def flush(minimum_idle = @idle_timeout) + return if minimum_idle.nil? + + idle_connections = synchronize do + @connections.select do |conn| + !conn.in_use? && conn.seconds_idle >= minimum_idle + end.each do |conn| + conn.lease + + @available.delete conn + @connections.delete conn + end + end + + idle_connections.each do |conn| + conn.disconnect! + end + end + + # Disconnect all currently idle connections. Connections currently checked + # out are unaffected. + def flush! + reap + flush(-1) + end + def num_waiting_in_queue # :nodoc: @available.num_waiting end @@ -677,7 +731,7 @@ module ActiveRecord # this block can't be easily moved into attempt_to_checkout_all_existing_connections's # rescue block, because doing so would put it outside of synchronize section, without # being in a critical section thread_report might become inaccurate - msg = "could not obtain ownership of all database connections in #{checkout_timeout} seconds" + msg = "could not obtain ownership of all database connections in #{checkout_timeout} seconds".dup thread_report = [] @connections.each do |conn| @@ -732,10 +786,10 @@ module ActiveRecord # Implementation detail: the connection returned by +acquire_connection+ # will already be "+connection.lease+ -ed" to the current thread. def acquire_connection(checkout_timeout) - # NOTE: we rely on +@available.poll+ and +try_to_checkout_new_connection+ to + # NOTE: we rely on <tt>@available.poll</tt> and +try_to_checkout_new_connection+ to # +conn.lease+ the returned connection (and to do this in a +synchronized+ # section). This is not the cleanest implementation, as ideally we would - # <tt>synchronize { conn.lease }</tt> in this method, but by leaving it to +@available.poll+ + # <tt>synchronize { conn.lease }</tt> in this method, but by leaving it to <tt>@available.poll</tt> # and +try_to_checkout_new_connection+ we can piggyback on +synchronize+ sections # of the said methods and avoid an additional +synchronize+ overhead. if conn = @available.poll || try_to_checkout_new_connection @@ -759,7 +813,7 @@ module ActiveRecord end end - # If the pool is not at a +@size+ limit, establish new connection. Connecting + # If the pool is not at a <tt>@size</tt> limit, establish new connection. Connecting # to the DB is done outside main synchronized section. #-- # Implementation constraint: a newly established connection returned by this @@ -825,7 +879,7 @@ module ActiveRecord # end # # class Book < ActiveRecord::Base - # establish_connection "library_db" + # establish_connection :library_db # end # # class ScaryBook < Book @@ -857,15 +911,35 @@ module ActiveRecord # All Active Record models use this handler to determine the connection pool that they # should use. # - # The ConnectionHandler class is not coupled with the Active models, as it has no knowlodge + # The ConnectionHandler class is not coupled with the Active models, as it has no knowledge # about the model. The model needs to pass a specification name to the handler, - # in order to lookup the correct connection pool. + # in order to look up the correct connection pool. class ConnectionHandler + def self.unowned_pool_finalizer(pid_map) # :nodoc: + lambda do |_| + discard_unowned_pools(pid_map) + end + end + + def self.discard_unowned_pools(pid_map) # :nodoc: + pid_map.each do |pid, pools| + pools.values.compact.each(&:discard!) unless pid == Process.pid + end + end + def initialize # These caches are keyed by spec.name (ConnectionSpecification#name). @owner_to_pool = Concurrent::Map.new(initial_capacity: 2) do |h, k| + # Discard the parent's connection pools immediately; we have no need + # of them + ConnectionHandler.discard_unowned_pools(h) + h[k] = Concurrent::Map.new(initial_capacity: 2) end + + # Backup finalizer: if the forked child never needed a pool, the above + # early discard has not occurred + ObjectSpace.define_finalizer self, ConnectionHandler.unowned_pool_finalizer(@owner_to_pool) end def connection_pool_list @@ -919,6 +993,13 @@ module ActiveRecord connection_pool_list.each(&:disconnect!) end + # Disconnects all currently idle connections. + # + # See ConnectionPool#flush! for details. + def flush_idle_connections! + connection_pool_list.each(&:flush!) + end + # Locate the connection of the nearest super class. This can be an # active or defined connection: if it is the latter, it will be # opened and set as the active connection for the class it was defined @@ -926,16 +1007,14 @@ module ActiveRecord def retrieve_connection(spec_name) #:nodoc: pool = retrieve_connection_pool(spec_name) raise ConnectionNotEstablished, "No connection pool with '#{spec_name}' found." unless pool - conn = pool.connection - raise ConnectionNotEstablished, "No connection for '#{spec_name}' in connection pool" unless conn - conn + pool.connection end # Returns true if a connection that's accessible to this class has # already been opened. def connected?(spec_name) - conn = retrieve_connection_pool(spec_name) - conn && conn.connected? + pool = retrieve_connection_pool(spec_name) + pool && pool.connected? end # Remove the connection for this class. This will close the active |