diff options
Diffstat (limited to 'activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb')
-rw-r--r-- | activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb | 321 |
1 files changed, 173 insertions, 148 deletions
diff --git a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb index 698da34d26..a609867898 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb @@ -2,6 +2,7 @@ require 'thread' require 'monitor' require 'set' require 'active_support/core_ext/module/deprecation' +require 'timeout' module ActiveRecord # Raised when a connection could not be obtained within the connection @@ -9,6 +10,10 @@ module ActiveRecord class ConnectionTimeoutError < ConnectionNotEstablished end + # Raised when a connection pool is full and another connection is requested + class PoolFullError < ConnectionNotEstablished + end + module ConnectionAdapters # Connection pool base class for managing Active Record database # connections. @@ -57,10 +62,50 @@ module ActiveRecord # * +wait_timeout+: number of seconds to block and wait for a connection # before giving up and raising a timeout error (default 5 seconds). class ConnectionPool + # Every +frequency+ seconds, the reaper will call +reap+ on +pool+. + # A reaper instantiated with a nil frequency will never reap the + # connection pool. + # + # Configure the frequency by setting "reaping_frequency" in your + # database yaml file. + class Reaper + attr_reader :pool, :frequency + + def initialize(pool, frequency) + @pool = pool + @frequency = frequency + end + + def run + return unless frequency + Thread.new(frequency, pool) { |t, p| + while true + sleep t + p.reap + end + } + end + end + include MonitorMixin - attr_accessor :automatic_reconnect - attr_reader :spec, :connections + attr_accessor :automatic_reconnect, :timeout + attr_reader :spec, :connections, :size, :reaper + + class Latch # :nodoc: + def initialize + @mutex = Mutex.new + @cond = ConditionVariable.new + end + + def release + @mutex.synchronize { @cond.broadcast } + end + + def await + @mutex.synchronize { @cond.wait @mutex } + end + end # Creates a new ConnectionPool object. +spec+ is a ConnectionSpecification # object which describes database connection information (e.g. adapter, @@ -76,12 +121,14 @@ module ActiveRecord # The cache of reserved connections mapped to threads @reserved_connections = {} - @queue = new_cond @timeout = spec.config[:wait_timeout] || 5 + @reaper = Reaper.new self, spec.config[:reaping_frequency] + @reaper.run # default max pool size to 5 @size = (spec.config[:pool] && spec.config[:pool].to_i) || 5 + @latch = Latch.new @connections = [] @automatic_reconnect = true end @@ -95,18 +142,21 @@ module ActiveRecord @reserved_connections[current_connection_id] ||= checkout end - # Check to see if there is an active connection in this connection - # pool. + # Is there an open connection that is being used for the current thread? def active_connection? - active_connections.any? + @reserved_connections.fetch(current_connection_id) { + return false + }.in_use? end # Signal that the thread is finished with the current connection. # #release_connection releases the connection-thread association # and returns the connection to the pool. def release_connection(with_id = current_connection_id) - conn = @reserved_connections.delete(with_id) - checkin conn if conn + synchronize do + conn = @reserved_connections.delete(with_id) + checkin conn if conn + end end # If a connection already exists yield it to the block. If no connection @@ -151,109 +201,43 @@ module ActiveRecord end end - # Verify active connections and remove and disconnect connections - # associated with stale threads. - def verify_active_connections! #:nodoc: - synchronize do - clear_stale_cached_connections! - @connections.each do |connection| - connection.verify! - end - end - end - - def columns - with_connection do |c| - c.schema_cache.columns - end - end - deprecate :columns - - def columns_hash - with_connection do |c| - c.schema_cache.columns_hash - end - end - deprecate :columns_hash - - def primary_keys - with_connection do |c| - c.schema_cache.primary_keys - end - end - deprecate :primary_keys - - def clear_cache! - with_connection do |c| - c.schema_cache.clear! - end - end - deprecate :clear_cache! - - # Return any checked-out connections back to the pool by threads that - # are no longer alive. - def clear_stale_cached_connections! - keys = @reserved_connections.keys - Thread.list.find_all { |t| - t.alive? - }.map { |thread| thread.object_id } - keys.each do |key| - conn = @reserved_connections[key] - ActiveSupport::Deprecation.warn(<<-eowarn) if conn.in_use? -Database connections will not be closed automatically, please close your -database connection at the end of the thread by calling `close` on your -connection. For example: ActiveRecord::Base.connection.close - eowarn - checkin conn - @reserved_connections.delete(key) - end + def clear_stale_cached_connections! # :nodoc: + reap end + deprecate :clear_stale_cached_connections! => "Please use #reap instead" # Check-out a database connection from the pool, indicating that you want # to use it. You should call #checkin when you no longer need this. # - # This is done by either returning an existing connection, or by creating - # a new connection. If the maximum number of connections for this pool has - # already been reached, but the pool is empty (i.e. they're all being used), - # then this method will wait until a thread has checked in a connection. - # The wait time is bounded however: if no connection can be checked out - # within the timeout specified for this pool, then a ConnectionTimeoutError - # exception will be raised. + # This is done by either returning and leasing existing connection, or by + # creating a new connection and leasing it. + # + # If all connections are leased and the pool is at capacity (meaning the + # number of currently leased connections is greater than or equal to the + # size limit set), an ActiveRecord::PoolFullError exception will be raised. # # Returns: an AbstractAdapter object. # # Raises: - # - ConnectionTimeoutError: no connection can be obtained from the pool - # within the timeout period. + # - PoolFullError: no connection can be obtained from the pool. def checkout - # Checkout an available connection - synchronize do - loop do - conn = @connections.find { |c| c.lease } - - unless conn - if @connections.size < @size - conn = checkout_new_connection - conn.lease - end - end - - if conn - checkout_and_verify conn - return conn - end - - @queue.wait(@timeout) - - if(active_connections.size < @connections.size) - next - else - clear_stale_cached_connections! - if @size == active_connections.size - raise ConnectionTimeoutError, "could not obtain a database connection#{" within #{@timeout} seconds" if @timeout}. The max pool size is currently #{@size}; consider increasing it." - end + loop do + # Checkout an available connection + synchronize do + # Try to find a connection that hasn't been leased, and lease it + conn = connections.find { |c| c.lease } + + # If all connections were leased, and we have room to expand, + # create a new connection and lease it. + if !conn && connections.size < size + conn = checkout_new_connection + conn.lease end + return checkout_and_verify(conn) if conn end + + Timeout.timeout(@timeout, PoolFullError) { @latch.await } end end @@ -266,13 +250,55 @@ connection. For example: ActiveRecord::Base.connection.close synchronize do conn.run_callbacks :checkin do conn.expire - @queue.signal end + + release conn end + @latch.release + end + + # Remove a connection from the connection pool. The connection will + # remain open and active but will no longer be managed by this pool. + def remove(conn) + synchronize do + @connections.delete conn + + # FIXME: we might want to store the key on the connection so that removing + # from the reserved hash will be a little easier. + release conn + end + @latch.release + end + + # Removes dead connections from the pool. A dead connection can occur + # if a programmer forgets to close a connection at the end of a thread + # or a thread dies unexpectedly. + def reap + synchronize do + stale = Time.now - @timeout + connections.dup.each do |conn| + remove conn if conn.in_use? && stale > conn.last_use && !conn.active? + end + end + @latch.release end private + def release(conn) + thread_id = nil + + if @reserved_connections[current_connection_id] == conn + thread_id = current_connection_id + else + thread_id = @reserved_connections.keys.find { |k| + @reserved_connections[k] == conn + } + end + + @reserved_connections.delete thread_id if thread_id + end + def new_connection ActiveRecord::Base.send(spec.adapter_method, spec.config) end @@ -296,10 +322,6 @@ connection. For example: ActiveRecord::Base.connection.close end c end - - def active_connections - @connections.find_all { |c| c.in_use? } - end end # ConnectionHandler is a collection of ConnectionPool objects. It is used @@ -326,16 +348,18 @@ connection. For example: ActiveRecord::Base.connection.close # ActiveRecord::Base.connection_handler. Active Record models use this to # determine that connection pool that they should use. class ConnectionHandler - attr_reader :connection_pools - - def initialize(pools = {}) + def initialize(pools = Hash.new { |h,k| h[k] = {} }) @connection_pools = pools - @class_to_pool = {} + @class_to_pool = Hash.new { |h,k| h[k] = {} } + end + + def connection_pools + @connection_pools[Process.pid] end def establish_connection(name, spec) - @connection_pools[spec] ||= ConnectionAdapters::ConnectionPool.new(spec) - @class_to_pool[name] = @connection_pools[spec] + set_pool_for_spec spec, ConnectionAdapters::ConnectionPool.new(spec) + set_class_to_pool name, connection_pools[spec] end # Returns true if there are any active connections among the connection @@ -348,21 +372,16 @@ connection. For example: ActiveRecord::Base.connection.close # and also returns connections to the pool cached by threads that are no # longer alive. def clear_active_connections! - @connection_pools.each_value {|pool| pool.release_connection } + connection_pools.each_value {|pool| pool.release_connection } end # Clears the cache which maps classes. def clear_reloadable_connections! - @connection_pools.each_value {|pool| pool.clear_reloadable_connections! } + connection_pools.each_value {|pool| pool.clear_reloadable_connections! } end def clear_all_connections! - @connection_pools.each_value {|pool| pool.disconnect! } - end - - # Verify active connections. - def verify_active_connections! #:nodoc: - @connection_pools.each_value {|pool| pool.verify_active_connections! } + connection_pools.each_value {|pool| pool.disconnect! } end # Locate the connection of the nearest super class. This can be an @@ -386,53 +405,56 @@ connection. For example: ActiveRecord::Base.connection.close # can be used as an argument for establish_connection, for easily # re-establishing the connection. def remove_connection(klass) - pool = @class_to_pool.delete(klass.name) + pool = class_to_pool.delete(klass.name) return nil unless pool - @connection_pools.delete pool.spec + connection_pools.delete pool.spec pool.automatic_reconnect = false pool.disconnect! pool.spec.config end def retrieve_connection_pool(klass) - pool = @class_to_pool[klass.name] + pool = get_pool_for_class klass.name return pool if pool - return nil if ActiveRecord::Base == klass - retrieve_connection_pool klass.superclass + return nil if ActiveRecord::Model == klass + retrieve_connection_pool klass.active_record_super end - end - - class ConnectionManagement - class Proxy # :nodoc: - attr_reader :body, :testing - def initialize(body, testing = false) - @body = body - @testing = testing - end + private - def method_missing(method_sym, *arguments, &block) - @body.send(method_sym, *arguments, &block) - end + def class_to_pool + @class_to_pool[Process.pid] + end - def respond_to?(method_sym, include_private = false) - super || @body.respond_to?(method_sym) - end + def set_pool_for_spec(spec, pool) + @connection_pools[Process.pid][spec] = pool + end - def each(&block) - body.each(&block) - end + def set_class_to_pool(name, pool) + @class_to_pool[Process.pid][name] = pool + pool + end - def close - body.close if body.respond_to?(:close) + def get_pool_for_class(klass) + @class_to_pool[Process.pid].fetch(klass) { + c_to_p = @class_to_pool.values.find { |class_to_pool| + class_to_pool[klass] + } - # Don't return connection (and perform implicit rollback) if - # this request is a part of integration test - ActiveRecord::Base.clear_active_connections! unless testing - end + if c_to_p + pool = c_to_p[klass] + pool = ConnectionAdapters::ConnectionPool.new pool.spec + set_pool_for_spec pool.spec, pool + set_class_to_pool klass, pool + else + set_class_to_pool klass, nil + end + } end + end + class ConnectionManagement def initialize(app) @app = app end @@ -440,9 +462,12 @@ connection. For example: ActiveRecord::Base.connection.close def call(env) testing = env.key?('rack.test') - status, headers, body = @app.call(env) + response = @app.call(env) + response[2] = ::Rack::BodyProxy.new(response[2]) do + ActiveRecord::Base.clear_active_connections! unless testing + end - [status, headers, Proxy.new(body, testing)] + response rescue ActiveRecord::Base.clear_active_connections! unless testing raise |