diff options
Diffstat (limited to 'activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb')
-rw-r--r-- | activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb | 294 |
1 files changed, 131 insertions, 163 deletions
diff --git a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb index 20863e73aa..b8f99adc22 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb @@ -1,7 +1,7 @@ require 'thread' require 'monitor' require 'set' -require 'active_support/core_ext/module/synchronization' +require 'active_support/core_ext/module/deprecation' module ActiveRecord # Raised when a connection could not be obtained within the connection @@ -9,6 +9,13 @@ module ActiveRecord class ConnectionTimeoutError < ConnectionNotEstablished end + # Raised when a connection pool is full and another connection is requested + class PoolFullError < ConnectionNotEstablished + def initialize size, timeout + super("Connection pool of size #{size} and timeout #{timeout}s is full") + end + end + module ConnectionAdapters # Connection pool base class for managing Active Record database # connections. @@ -57,10 +64,35 @@ module ActiveRecord # * +wait_timeout+: number of seconds to block and wait for a connection # before giving up and raising a timeout error (default 5 seconds). class ConnectionPool - attr_accessor :automatic_reconnect - attr_reader :spec, :connections - attr_reader :columns, :columns_hash, :primary_keys, :tables - attr_reader :column_defaults + # Every +frequency+ seconds, the reaper will call +reap+ on +pool+. + # A reaper instantiated with a nil frequency will never reap the + # connection pool. + # + # Configure the frequency by setting "reaping_frequency" in your + # database yaml file. + class Reaper + attr_reader :pool, :frequency + + def initialize(pool, frequency) + @pool = pool + @frequency = frequency + end + + def run + return unless frequency + Thread.new(frequency, pool) { |t, p| + while true + sleep t + p.reap + end + } + end + end + + include MonitorMixin + + attr_accessor :automatic_reconnect, :timeout + attr_reader :spec, :connections, :size, :reaper # Creates a new ConnectionPool object. +spec+ is a ConnectionSpecification # object which describes database connection information (e.g. adapter, @@ -69,88 +101,22 @@ module ActiveRecord # # The default ConnectionPool maximum size is 5. def initialize(spec) + super() + @spec = spec # The cache of reserved connections mapped to threads @reserved_connections = {} - # The mutex used to synchronize pool access - @connection_mutex = Monitor.new - @queue = @connection_mutex.new_cond @timeout = spec.config[:wait_timeout] || 5 + @reaper = Reaper.new self, spec.config[:reaping_frequency] + @reaper.run # default max pool size to 5 @size = (spec.config[:pool] && spec.config[:pool].to_i) || 5 @connections = [] - @checked_out = [] @automatic_reconnect = true - @tables = {} - @visitor = nil - - @columns = Hash.new do |h, table_name| - h[table_name] = with_connection do |conn| - - # Fetch a list of columns - conn.columns(table_name, "#{table_name} Columns").tap do |columns| - - # set primary key information - columns.each do |column| - column.primary = column.name == primary_keys[table_name] - end - end - end - end - - @columns_hash = Hash.new do |h, table_name| - h[table_name] = Hash[columns[table_name].map { |col| - [col.name, col] - }] - end - - @column_defaults = Hash.new do |h, table_name| - h[table_name] = Hash[columns[table_name].map { |col| - [col.name, col.default] - }] - end - - @primary_keys = Hash.new do |h, table_name| - h[table_name] = with_connection do |conn| - table_exists?(table_name) ? conn.primary_key(table_name) : 'id' - end - end - end - - # A cached lookup for table existence. - def table_exists?(name) - return true if @tables.key? name - - with_connection do |conn| - conn.tables.each { |table| @tables[table] = true } - @tables[name] = true if !@tables.key?(name) && conn.table_exists?(name) - end - - @tables.key? name - end - - # Clears out internal caches: - # - # * columns - # * columns_hash - # * tables - def clear_cache! - @columns.clear - @columns_hash.clear - @column_defaults.clear - @tables.clear - end - - # Clear out internal caches for table with +table_name+. - def clear_table_cache!(table_name) - @columns.delete table_name - @columns_hash.delete table_name - @column_defaults.delete table_name - @primary_keys.delete table_name end # Retrieve the connection associated with the current thread, or call @@ -165,7 +131,7 @@ module ActiveRecord # Check to see if there is an active connection in this connection # pool. def active_connection? - @reserved_connections.key? current_connection_id + active_connections.any? end # Signal that the thread is finished with the current connection. @@ -181,7 +147,7 @@ module ActiveRecord # connection when finished. def with_connection connection_id = current_connection_id - fresh_connection = true unless @reserved_connections[connection_id] + fresh_connection = true unless active_connection? yield connection ensure release_connection(connection_id) if fresh_connection @@ -189,94 +155,80 @@ module ActiveRecord # Returns true if a connection has already been opened. def connected? - !@connections.empty? + synchronize { @connections.any? } end # Disconnects all connections in the pool, and clears the pool. def disconnect! - @reserved_connections.each do |name,conn| - checkin conn - end - @reserved_connections = {} - @connections.each do |conn| - conn.disconnect! + synchronize do + @reserved_connections = {} + @connections.each do |conn| + checkin conn + conn.disconnect! + end + @connections = [] end - @connections = [] end # Clears the cache which maps classes. def clear_reloadable_connections! - @reserved_connections.each do |name, conn| - checkin conn - end - @reserved_connections = {} - @connections.each do |conn| - conn.disconnect! if conn.requires_reloading? - end - @connections.delete_if do |conn| - conn.requires_reloading? + synchronize do + @reserved_connections = {} + @connections.each do |conn| + checkin conn + conn.disconnect! if conn.requires_reloading? + end + @connections.delete_if do |conn| + conn.requires_reloading? + end end end # Verify active connections and remove and disconnect connections # associated with stale threads. def verify_active_connections! #:nodoc: - clear_stale_cached_connections! - @connections.each do |connection| - connection.verify! + synchronize do + @connections.each do |connection| + connection.verify! + end end end - # Return any checked-out connections back to the pool by threads that - # are no longer alive. - def clear_stale_cached_connections! - keys = @reserved_connections.keys - Thread.list.find_all { |t| - t.alive? - }.map { |thread| thread.object_id } - keys.each do |key| - checkin @reserved_connections[key] - @reserved_connections.delete(key) - end + def clear_stale_cached_connections! # :nodoc: end + deprecate :clear_stale_cached_connections! # Check-out a database connection from the pool, indicating that you want # to use it. You should call #checkin when you no longer need this. # - # This is done by either returning an existing connection, or by creating - # a new connection. If the maximum number of connections for this pool has - # already been reached, but the pool is empty (i.e. they're all being used), - # then this method will wait until a thread has checked in a connection. - # The wait time is bounded however: if no connection can be checked out - # within the timeout specified for this pool, then a ConnectionTimeoutError - # exception will be raised. + # This is done by either returning and leasing existing connection, or by + # creating a new connection and leasing it. + # + # If all connections are leased and the pool is at capacity (meaning the + # number of currently leased connections is greater than or equal to the + # size limit set), an ActiveRecord::PoolFullError exception will be raised. # # Returns: an AbstractAdapter object. # # Raises: - # - ConnectionTimeoutError: no connection can be obtained from the pool - # within the timeout period. + # - PoolFullError: no connection can be obtained from the pool. def checkout # Checkout an available connection - @connection_mutex.synchronize do - loop do - conn = if @checked_out.size < @connections.size - checkout_existing_connection - elsif @connections.size < @size - checkout_new_connection - end - return conn if conn - - @queue.wait(@timeout) - - if(@checked_out.size < @connections.size) - next - else - clear_stale_cached_connections! - if @size == @checked_out.size - raise ConnectionTimeoutError, "could not obtain a database connection#{" within #{@timeout} seconds" if @timeout}. The max pool size is currently #{@size}; consider increasing it." - end - end + synchronize do + # Try to find a connection that hasn't been leased, and lease it + conn = connections.find { |c| c.lease } + + # If all connections were leased, and we have room to expand, + # create a new connection and lease it. + if !conn && connections.size < size + conn = checkout_new_connection + conn.lease + end + if conn + checkout_and_verify conn + else + raise PoolFullError.new(size, timeout) end end end @@ -287,56 +239,69 @@ module ActiveRecord # +conn+: an AbstractAdapter object, which was obtained by earlier by # calling +checkout+ on this pool. def checkin(conn) - @connection_mutex.synchronize do + synchronize do conn.run_callbacks :checkin do - @checked_out.delete conn - @queue.signal + conn.expire end end end - synchronize :clear_reloadable_connections!, :verify_active_connections!, - :connected?, :disconnect!, :with => :@connection_mutex + # Remove a connection from the connection pool. The connection will + # remain open and active but will no longer be managed by this pool. + def remove(conn) + synchronize do + @connections.delete conn - private + # FIXME: we might want to store the key on the connection so that removing + # from the reserved hash will be a little easier. + thread_id = @reserved_connections.keys.find { |k| + @reserved_connections[k] == conn + } + @reserved_connections.delete thread_id if thread_id + end + end - def new_connection - connection = ActiveRecord::Base.send(spec.adapter_method, spec.config) + # Removes dead connections from the pool. A dead connection can occur + # if a programmer forgets to close a connection at the end of a thread + # or a thread dies unexpectedly. + def reap + synchronize do + stale = Time.now - @timeout + connections.dup.each do |conn| + remove conn if conn.in_use? && stale > conn.last_use && !conn.active? + end + end + end - # TODO: This is a bit icky, and in the long term we may want to change the method - # signature for connections. Also, if we switch to have one visitor per - # connection (and therefore per thread), we can get rid of the thread-local - # variable in Arel::Visitors::ToSql. - @visitor ||= connection.class.visitor_for(self) - connection.visitor = @visitor + private - connection + def new_connection + ActiveRecord::Base.send(spec.adapter_method, spec.config) end def current_connection_id #:nodoc: - Thread.current.object_id + ActiveRecord::Base.connection_id ||= Thread.current.object_id end def checkout_new_connection raise ConnectionNotEstablished unless @automatic_reconnect c = new_connection + c.pool = self @connections << c - checkout_and_verify(c) - end - - def checkout_existing_connection - c = (@connections - @checked_out).first - checkout_and_verify(c) + c end def checkout_and_verify(c) c.run_callbacks :checkout do c.verify! - @checked_out << c end c end + + def active_connections + @connections.find_all { |c| c.in_use? } + end end # ConnectionHandler is a collection of ConnectionPool objects. It is used @@ -367,10 +332,12 @@ module ActiveRecord def initialize(pools = {}) @connection_pools = pools + @class_to_pool = {} end def establish_connection(name, spec) - @connection_pools[name] = ConnectionAdapters::ConnectionPool.new(spec) + @connection_pools[spec] ||= ConnectionAdapters::ConnectionPool.new(spec) + @class_to_pool[name] = @connection_pools[spec] end # Returns true if there are any active connections among the connection @@ -421,19 +388,20 @@ module ActiveRecord # can be used as an argument for establish_connection, for easily # re-establishing the connection. def remove_connection(klass) - pool = @connection_pools.delete(klass.name) + pool = @class_to_pool.delete(klass.name) return nil unless pool + @connection_pools.delete pool.spec pool.automatic_reconnect = false pool.disconnect! pool.spec.config end def retrieve_connection_pool(klass) - pool = @connection_pools[klass.name] + pool = @class_to_pool[klass.name] return pool if pool - return nil if ActiveRecord::Base == klass - retrieve_connection_pool klass.superclass + return nil if ActiveRecord::Model == klass + retrieve_connection_pool klass.active_record_super end end |