diff options
Diffstat (limited to 'activerecord/lib/active_record/connection_adapters/abstract')
-rw-r--r-- | activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb | 140 |
1 files changed, 85 insertions, 55 deletions
diff --git a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb index ea738cb305..b8f99adc22 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb @@ -9,6 +9,13 @@ module ActiveRecord class ConnectionTimeoutError < ConnectionNotEstablished end + # Raised when a connection pool is full and another connection is requested + class PoolFullError < ConnectionNotEstablished + def initialize size, timeout + super("Connection pool of size #{size} and timeout #{timeout}s is full") + end + end + module ConnectionAdapters # Connection pool base class for managing Active Record database # connections. @@ -57,10 +64,35 @@ module ActiveRecord # * +wait_timeout+: number of seconds to block and wait for a connection # before giving up and raising a timeout error (default 5 seconds). class ConnectionPool + # Every +frequency+ seconds, the reaper will call +reap+ on +pool+. + # A reaper instantiated with a nil frequency will never reap the + # connection pool. + # + # Configure the frequency by setting "reaping_frequency" in your + # database yaml file. + class Reaper + attr_reader :pool, :frequency + + def initialize(pool, frequency) + @pool = pool + @frequency = frequency + end + + def run + return unless frequency + Thread.new(frequency, pool) { |t, p| + while true + sleep t + p.reap + end + } + end + end + include MonitorMixin - attr_accessor :automatic_reconnect - attr_reader :spec, :connections + attr_accessor :automatic_reconnect, :timeout + attr_reader :spec, :connections, :size, :reaper # Creates a new ConnectionPool object. +spec+ is a ConnectionSpecification # object which describes database connection information (e.g. adapter, @@ -76,8 +108,9 @@ module ActiveRecord # The cache of reserved connections mapped to threads @reserved_connections = {} - @queue = new_cond @timeout = spec.config[:wait_timeout] || 5 + @reaper = Reaper.new self, spec.config[:reaping_frequency] + @reaper.run # default max pool size to 5 @size = (spec.config[:pool] && spec.config[:pool].to_i) || 5 @@ -155,76 +188,47 @@ module ActiveRecord # associated with stale threads. def verify_active_connections! #:nodoc: synchronize do - clear_stale_cached_connections! @connections.each do |connection| connection.verify! end end end - # Return any checked-out connections back to the pool by threads that - # are no longer alive. - def clear_stale_cached_connections! - keys = @reserved_connections.keys - Thread.list.find_all { |t| - t.alive? - }.map { |thread| thread.object_id } - keys.each do |key| - conn = @reserved_connections[key] - ActiveSupport::Deprecation.warn(<<-eowarn) if conn.in_use? -Database connections will not be closed automatically, please close your -database connection at the end of the thread by calling `close` on your -connection. For example: ActiveRecord::Base.connection.close - eowarn - checkin conn - @reserved_connections.delete(key) - end + def clear_stale_cached_connections! # :nodoc: end + deprecate :clear_stale_cached_connections! # Check-out a database connection from the pool, indicating that you want # to use it. You should call #checkin when you no longer need this. # - # This is done by either returning an existing connection, or by creating - # a new connection. If the maximum number of connections for this pool has - # already been reached, but the pool is empty (i.e. they're all being used), - # then this method will wait until a thread has checked in a connection. - # The wait time is bounded however: if no connection can be checked out - # within the timeout specified for this pool, then a ConnectionTimeoutError - # exception will be raised. + # This is done by either returning and leasing existing connection, or by + # creating a new connection and leasing it. + # + # If all connections are leased and the pool is at capacity (meaning the + # number of currently leased connections is greater than or equal to the + # size limit set), an ActiveRecord::PoolFullError exception will be raised. # # Returns: an AbstractAdapter object. # # Raises: - # - ConnectionTimeoutError: no connection can be obtained from the pool - # within the timeout period. + # - PoolFullError: no connection can be obtained from the pool. def checkout # Checkout an available connection synchronize do - loop do - conn = @connections.find { |c| c.lease } - - unless conn - if @connections.size < @size - conn = checkout_new_connection - conn.lease - end - end - - if conn - checkout_and_verify conn - return conn - end - - @queue.wait(@timeout) - - if(active_connections.size < @connections.size) - next - else - clear_stale_cached_connections! - if @size == active_connections.size - raise ConnectionTimeoutError, "could not obtain a database connection#{" within #{@timeout} seconds" if @timeout}. The max pool size is currently #{@size}; consider increasing it." - end - end + # Try to find a connection that hasn't been leased, and lease it + conn = connections.find { |c| c.lease } + + # If all connections were leased, and we have room to expand, + # create a new connection and lease it. + if !conn && connections.size < size + conn = checkout_new_connection + conn.lease + end + if conn + checkout_and_verify conn + else + raise PoolFullError.new(size, timeout) end end end @@ -238,7 +242,33 @@ connection. For example: ActiveRecord::Base.connection.close synchronize do conn.run_callbacks :checkin do conn.expire - @queue.signal + end + end + end + + # Remove a connection from the connection pool. The connection will + # remain open and active but will no longer be managed by this pool. + def remove(conn) + synchronize do + @connections.delete conn + + # FIXME: we might want to store the key on the connection so that removing + # from the reserved hash will be a little easier. + thread_id = @reserved_connections.keys.find { |k| + @reserved_connections[k] == conn + } + @reserved_connections.delete thread_id if thread_id + end + end + + # Removes dead connections from the pool. A dead connection can occur + # if a programmer forgets to close a connection at the end of a thread + # or a thread dies unexpectedly. + def reap + synchronize do + stale = Time.now - @timeout + connections.dup.each do |conn| + remove conn if conn.in_use? && stale > conn.last_use && !conn.active? end end end |