From 029952edf464b94184d9b48f3bdff49d2746d721 Mon Sep 17 00:00:00 2001 From: Nick Sieger Date: Mon, 4 Aug 2008 22:43:32 -0700 Subject: Extract a base class for connection pools, start to flesh out reserve/release API --- .../abstract/connection_pool.rb | 189 ++++++++++++--------- 1 file changed, 105 insertions(+), 84 deletions(-) diff --git a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb index ca9f0a5d9b..cebff0262d 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb @@ -3,135 +3,156 @@ require 'set' module ActiveRecord module ConnectionAdapters + # Connection pool API for ActiveRecord database connections. class ConnectionPool - delegate :verification_timeout, :to => "::ActiveRecord::Base" - attr_reader :active_connections, :spec + # Factory method for connection pools. + # Determines pool type to use based on contents of connection specification. + # FIXME: specification configuration TBD. + def self.create(spec) + ConnectionPerThread.new(spec) + end + delegate :verification_timeout, :to => "::ActiveRecord::Base" + attr_reader :spec def initialize(spec) - # The thread id -> adapter cache. - @active_connections = {} - - # The ConnectionSpecification for this pool @spec = spec - + # The cache of reserved connections mapped to threads + @reserved_connections = {} # The mutex used to synchronize pool access @connection_mutex = Monitor.new end - def active_connection_name #:nodoc: - Thread.current.object_id + # Retrieve the connection reserved for the current thread, or call #reserve to obtain one + # if necessary. + def open_connection + if conn = @reserved_connections[active_connection_name] + conn.verify!(verification_timeout) + conn + else + @reserved_connections[active_connection_name] = reserve + end end + alias connection open_connection - def active_connection - active_connections[active_connection_name] + def close_connection + conn = @reserved_connections.delete(active_connection_name) + release conn if conn end - # Returns the connection currently associated with the class. This can - # also be used to "borrow" the connection to do database work unrelated - # to any of the specific Active Records. - def connection - if conn = active_connections[active_connection_name] - conn - else - # retrieve_connection sets the cache key. - conn = retrieve_connection - active_connections[active_connection_name] = conn - end + # Returns true if a connection has already been opened. + def connected? + !connections.empty? end - # Clears the cache which maps classes to connections. - def clear_active_connections! - clear_entries!(@active_connections, [active_connection_name]) do |name, conn| + # Reserve (check-out) a database connection for the current thread. + def reserve + raise NotImplementedError, "reserve is an abstract method" + end + alias checkout reserve + + # Release (check-in) a database connection for the current thread. + def release(connection) + raise NotImplementedError, "release is an abstract method" + end + alias checkin release + + # Disconnect all connections in the pool. + def disconnect! + @reserved_connections.each do |name,conn| + release(conn) + end + connections.each do |conn| conn.disconnect! end + @reserved_connections = {} end # Clears the cache which maps classes def clear_reloadable_connections! - @active_connections.each do |name, conn| + @reserved_connections.each do |name, conn| + release(conn) + end + @reserved_connections = {} + connections.each do |conn| if conn.requires_reloading? conn.disconnect! - @active_connections.delete(name) + remove_connection conn end end end # Verify active connections. def verify_active_connections! #:nodoc: - remove_stale_cached_threads!(@active_connections) do |name, conn| - conn.disconnect! + remove_stale_cached_threads!(@reserved_connections) do |name, conn| + release(conn) end - active_connections.each_value do |connection| + connections.each do |connection| connection.verify!(verification_timeout) end end - def retrieve_connection #:nodoc: - # Name is nil if establish_connection hasn't been called for - # some class along the inheritance chain up to AR::Base yet. - name = active_connection_name - if conn = active_connections[name] - # Verify the connection. - conn.verify!(verification_timeout) - else - self.set_connection spec - conn = active_connections[name] - end + synchronize :open_connection, :close_connection, :reserve, :release, + :clear_reloadable_connections!, :verify_active_connections!, + :connected?, :disconnect!, :with => :@connection_mutex - conn or raise ConnectionNotEstablished + private + def active_connection_name #:nodoc: + Thread.current.object_id end - # Returns true if a connection that's accessible to this class has already been opened. - def connected? - active_connections[active_connection_name] ? true : false + def remove_connection(conn) + raise NotImplementedError, "remove_connection is an abstract method" end - # Disconnect all connections in the pool. - def disconnect! - clear_cache!(@active_connections) do |name, conn| - conn.disconnect! - end + # Array containing all connections (reserved or available) in the pool. + def connections + raise NotImplementedError, "connections is an abstract method" end - # Set the connection for the class. - def set_connection(spec) #:nodoc: - if spec.kind_of?(ActiveRecord::ConnectionAdapters::AbstractAdapter) - active_connections[active_connection_name] = spec - elsif spec.kind_of?(ActiveRecord::Base::ConnectionSpecification) - config = spec.config.reverse_merge(:allow_concurrency => ActiveRecord::Base.allow_concurrency) - self.set_connection ActiveRecord::Base.send(spec.adapter_method, config) - else - raise ConnectionNotEstablished + # Remove stale threads from the cache. + def remove_stale_cached_threads!(cache, &block) + keys = Set.new(cache.keys) + + Thread.list.each do |thread| + keys.delete(thread.object_id) if thread.alive? + end + keys.each do |key| + next unless cache.has_key?(key) + block.call(key, cache[key]) + cache.delete(key) end end + end + + class ConnectionPerThread < ConnectionPool + def active_connection + @reserved_connections[active_connection_name] + end - synchronize :active_connection, :connection, :clear_active_connections!, - :clear_reloadable_connections!, :verify_active_connections!, :retrieve_connection, - :connected?, :disconnect!, :set_connection, :with => :@connection_mutex + def active_connections; @reserved_connections; end - private - def clear_cache!(cache, &block) - cache.each(&block) if block_given? - cache.clear - end + def reserve + new_connection + end - # Remove stale threads from the cache. - def remove_stale_cached_threads!(cache, &block) - stale = Set.new(cache.keys) + def release(conn) + conn.disconnect! + end - Thread.list.each do |thread| - stale.delete(thread.object_id) if thread.alive? - end - clear_entries!(cache, stale, &block) - end + private + # Set the connection for the class. + def new_connection + config = spec.config.reverse_merge(:allow_concurrency => ActiveRecord::Base.allow_concurrency) + ActiveRecord::Base.send(spec.adapter_method, config) + end - def clear_entries!(cache, keys, &block) - keys.each do |key| - next unless cache.has_key?(key) - block.call(key, cache[key]) - cache.delete(key) - end - end + def connections + @reserved_connections.values + end + + def remove_connection(conn) + @reserved_connections.delete_if {|k,v| v == conn} + end end module ConnectionHandlerMethods @@ -144,7 +165,7 @@ module ActiveRecord end def establish_connection(name, spec) - @connection_pools[name] = ConnectionAdapters::ConnectionPool.new(spec) + @connection_pools[name] = ConnectionAdapters::ConnectionPool.create(spec) end # for internal use only and for testing @@ -158,7 +179,7 @@ module ActiveRecord # Clears the cache which maps classes to connections. def clear_active_connections! - @connection_pools.each_value {|pool| pool.clear_active_connections! } + @connection_pools.each_value {|pool| pool.close_connection } end # Clears the cache which maps classes -- cgit v1.2.3