aboutsummaryrefslogtreecommitdiffstats
path: root/activerecord
diff options
context:
space:
mode:
Diffstat (limited to 'activerecord')
-rw-r--r--activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb189
1 files changed, 105 insertions, 84 deletions
diff --git a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb
index ca9f0a5d9b..cebff0262d 100644
--- a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb
+++ b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb
@@ -3,135 +3,156 @@ require 'set'
module ActiveRecord
module ConnectionAdapters
+ # Connection pool API for ActiveRecord database connections.
class ConnectionPool
- delegate :verification_timeout, :to => "::ActiveRecord::Base"
- attr_reader :active_connections, :spec
+ # Factory method for connection pools.
+ # Determines pool type to use based on contents of connection specification.
+ # FIXME: specification configuration TBD.
+ def self.create(spec)
+ ConnectionPerThread.new(spec)
+ end
+ delegate :verification_timeout, :to => "::ActiveRecord::Base"
+ attr_reader :spec
def initialize(spec)
- # The thread id -> adapter cache.
- @active_connections = {}
-
- # The ConnectionSpecification for this pool
@spec = spec
-
+ # The cache of reserved connections mapped to threads
+ @reserved_connections = {}
# The mutex used to synchronize pool access
@connection_mutex = Monitor.new
end
- def active_connection_name #:nodoc:
- Thread.current.object_id
+ # Retrieve the connection reserved for the current thread, or call #reserve to obtain one
+ # if necessary.
+ def open_connection
+ if conn = @reserved_connections[active_connection_name]
+ conn.verify!(verification_timeout)
+ conn
+ else
+ @reserved_connections[active_connection_name] = reserve
+ end
end
+ alias connection open_connection
- def active_connection
- active_connections[active_connection_name]
+ def close_connection
+ conn = @reserved_connections.delete(active_connection_name)
+ release conn if conn
end
- # Returns the connection currently associated with the class. This can
- # also be used to "borrow" the connection to do database work unrelated
- # to any of the specific Active Records.
- def connection
- if conn = active_connections[active_connection_name]
- conn
- else
- # retrieve_connection sets the cache key.
- conn = retrieve_connection
- active_connections[active_connection_name] = conn
- end
+ # Returns true if a connection has already been opened.
+ def connected?
+ !connections.empty?
end
- # Clears the cache which maps classes to connections.
- def clear_active_connections!
- clear_entries!(@active_connections, [active_connection_name]) do |name, conn|
+ # Reserve (check-out) a database connection for the current thread.
+ def reserve
+ raise NotImplementedError, "reserve is an abstract method"
+ end
+ alias checkout reserve
+
+ # Release (check-in) a database connection for the current thread.
+ def release(connection)
+ raise NotImplementedError, "release is an abstract method"
+ end
+ alias checkin release
+
+ # Disconnect all connections in the pool.
+ def disconnect!
+ @reserved_connections.each do |name,conn|
+ release(conn)
+ end
+ connections.each do |conn|
conn.disconnect!
end
+ @reserved_connections = {}
end
# Clears the cache which maps classes
def clear_reloadable_connections!
- @active_connections.each do |name, conn|
+ @reserved_connections.each do |name, conn|
+ release(conn)
+ end
+ @reserved_connections = {}
+ connections.each do |conn|
if conn.requires_reloading?
conn.disconnect!
- @active_connections.delete(name)
+ remove_connection conn
end
end
end
# Verify active connections.
def verify_active_connections! #:nodoc:
- remove_stale_cached_threads!(@active_connections) do |name, conn|
- conn.disconnect!
+ remove_stale_cached_threads!(@reserved_connections) do |name, conn|
+ release(conn)
end
- active_connections.each_value do |connection|
+ connections.each do |connection|
connection.verify!(verification_timeout)
end
end
- def retrieve_connection #:nodoc:
- # Name is nil if establish_connection hasn't been called for
- # some class along the inheritance chain up to AR::Base yet.
- name = active_connection_name
- if conn = active_connections[name]
- # Verify the connection.
- conn.verify!(verification_timeout)
- else
- self.set_connection spec
- conn = active_connections[name]
- end
+ synchronize :open_connection, :close_connection, :reserve, :release,
+ :clear_reloadable_connections!, :verify_active_connections!,
+ :connected?, :disconnect!, :with => :@connection_mutex
- conn or raise ConnectionNotEstablished
+ private
+ def active_connection_name #:nodoc:
+ Thread.current.object_id
end
- # Returns true if a connection that's accessible to this class has already been opened.
- def connected?
- active_connections[active_connection_name] ? true : false
+ def remove_connection(conn)
+ raise NotImplementedError, "remove_connection is an abstract method"
end
- # Disconnect all connections in the pool.
- def disconnect!
- clear_cache!(@active_connections) do |name, conn|
- conn.disconnect!
- end
+ # Array containing all connections (reserved or available) in the pool.
+ def connections
+ raise NotImplementedError, "connections is an abstract method"
end
- # Set the connection for the class.
- def set_connection(spec) #:nodoc:
- if spec.kind_of?(ActiveRecord::ConnectionAdapters::AbstractAdapter)
- active_connections[active_connection_name] = spec
- elsif spec.kind_of?(ActiveRecord::Base::ConnectionSpecification)
- config = spec.config.reverse_merge(:allow_concurrency => ActiveRecord::Base.allow_concurrency)
- self.set_connection ActiveRecord::Base.send(spec.adapter_method, config)
- else
- raise ConnectionNotEstablished
+ # Remove stale threads from the cache.
+ def remove_stale_cached_threads!(cache, &block)
+ keys = Set.new(cache.keys)
+
+ Thread.list.each do |thread|
+ keys.delete(thread.object_id) if thread.alive?
+ end
+ keys.each do |key|
+ next unless cache.has_key?(key)
+ block.call(key, cache[key])
+ cache.delete(key)
end
end
+ end
+
+ class ConnectionPerThread < ConnectionPool
+ def active_connection
+ @reserved_connections[active_connection_name]
+ end
- synchronize :active_connection, :connection, :clear_active_connections!,
- :clear_reloadable_connections!, :verify_active_connections!, :retrieve_connection,
- :connected?, :disconnect!, :set_connection, :with => :@connection_mutex
+ def active_connections; @reserved_connections; end
- private
- def clear_cache!(cache, &block)
- cache.each(&block) if block_given?
- cache.clear
- end
+ def reserve
+ new_connection
+ end
- # Remove stale threads from the cache.
- def remove_stale_cached_threads!(cache, &block)
- stale = Set.new(cache.keys)
+ def release(conn)
+ conn.disconnect!
+ end
- Thread.list.each do |thread|
- stale.delete(thread.object_id) if thread.alive?
- end
- clear_entries!(cache, stale, &block)
- end
+ private
+ # Set the connection for the class.
+ def new_connection
+ config = spec.config.reverse_merge(:allow_concurrency => ActiveRecord::Base.allow_concurrency)
+ ActiveRecord::Base.send(spec.adapter_method, config)
+ end
- def clear_entries!(cache, keys, &block)
- keys.each do |key|
- next unless cache.has_key?(key)
- block.call(key, cache[key])
- cache.delete(key)
- end
- end
+ def connections
+ @reserved_connections.values
+ end
+
+ def remove_connection(conn)
+ @reserved_connections.delete_if {|k,v| v == conn}
+ end
end
module ConnectionHandlerMethods
@@ -144,7 +165,7 @@ module ActiveRecord
end
def establish_connection(name, spec)
- @connection_pools[name] = ConnectionAdapters::ConnectionPool.new(spec)
+ @connection_pools[name] = ConnectionAdapters::ConnectionPool.create(spec)
end
# for internal use only and for testing
@@ -158,7 +179,7 @@ module ActiveRecord
# Clears the cache which maps classes to connections.
def clear_active_connections!
- @connection_pools.each_value {|pool| pool.clear_active_connections! }
+ @connection_pools.each_value {|pool| pool.close_connection }
end
# Clears the cache which maps classes