diff options
Diffstat (limited to 'activerecord')
3 files changed, 156 insertions, 22 deletions
diff --git a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb index 28446fd64d..97c6e3e886 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb @@ -2,14 +2,43 @@ require 'monitor' require 'set' module ActiveRecord + # Raised when a connection could not be obtained within the connection + # acquisition timeout period. + class ConnectionTimeoutError < ConnectionNotEstablished + end + module ConnectionAdapters - # Connection pool base class and ActiveRecord database connections. - class ConnectionPool + # Connection pool base class for managing ActiveRecord database + # connections. + # + # Connections can be obtained and used from a connection pool in several + # ways: + # + # 1. Simply use ActiveRecord::Base.connection as in pre-connection-pooled + # ActiveRecord. Eventually, when you're done with the connection and + # wish it to be returned to the pool, you call + # ActiveRecord::Base.connection_pool.release_thread_connection. + # 2. Manually check out a connection from the pool with + # ActiveRecord::Base.connection_pool.checkout. You are responsible for + # returning this connection to the pool when finished by calling + # ActiveRecord::Base.connection_pool.checkin(connection). + # 3. Use ActiveRecord::Base.connection_pool.with_connection(&block), which + # obtains a connection, yields it as the sole argument to the block, + # and returns it to the pool after the block completes. + class AbstractConnectionPool # Factory method for connection pools. - # Determines pool type to use based on contents of connection specification. - # FIXME: specification configuration TBD. + # Determines pool type to use based on contents of connection + # specification. Additional options for connection specification: + # + # * +pool+: number indicating size of fixed connection pool to use + # * +wait_timeout+ (optional): number of seconds to block and wait + # for a connection before giving up and raising a timeout error. def self.create(spec) - ConnectionPerThread.new(spec) + if spec.config[:pool] && spec.config[:pool].to_i > 0 + ConnectionPool.new(spec) + else + ConnectionPerThread.new(spec) + end end delegate :verification_timeout, :to => "::ActiveRecord::Base" @@ -115,11 +144,16 @@ module ActiveRecord end private :connections - synchronize :connection, :release_thread_connection, :checkout, :checkin, + synchronize :connection, :release_thread_connection, :clear_reloadable_connections!, :verify_active_connections!, :connected?, :disconnect!, :with => :@connection_mutex private + def new_connection + config = spec.config.reverse_merge(:allow_concurrency => ActiveRecord::Base.allow_concurrency) + ActiveRecord::Base.send(spec.adapter_method, config) + end + def active_connection_name #:nodoc: Thread.current.object_id end @@ -139,7 +173,10 @@ module ActiveRecord end end - class ConnectionPerThread < ConnectionPool + # ConnectionPerThread is a simple implementation: always create/disconnect + # on checkout/checkin, and use the base class reserved connections hash to + # manage the per-thread connections. + class ConnectionPerThread < AbstractConnectionPool def active_connection @reserved_connections[active_connection_name] end @@ -155,12 +192,6 @@ module ActiveRecord end private - # Set the connection for the class. - def new_connection - config = spec.config.reverse_merge(:allow_concurrency => ActiveRecord::Base.allow_concurrency) - ActiveRecord::Base.send(spec.adapter_method, config) - end - def connections @reserved_connections.values end @@ -170,6 +201,70 @@ module ActiveRecord end end + # ConnectionPool provides a full, fixed-size connection pool with timed + # waits when the pool is exhausted. + class ConnectionPool < AbstractConnectionPool + def initialize(spec) + super + # default 5 second timeout + @timeout = spec.config[:wait_timeout] || 5 + @size = spec.config[:pool].to_i + @queue = @connection_mutex.new_cond + @connections = [] + @checked_out = [] + end + + def checkout + # Checkout an available connection + conn = @connection_mutex.synchronize do + if @connections.length < @size + checkout_new_connection + elsif @checked_out.size < @connections.size + checkout_existing_connection + end + end + return conn if conn + + # No connections available; wait for one + @connection_mutex.synchronize do + if @queue.wait(@timeout) + checkout_existing_connection + else + raise ConnectionTimeoutError, "could not obtain a database connection in a timely fashion" + end + end + end + + def checkin(conn) + @connection_mutex.synchronize do + @checked_out -= conn + @queue.signal + end + end + + private + def checkout_new_connection + c = new_connection + @connections << c + @checked_out << c + c + end + + def checkout_existing_connection + c = [@connections - @checked_out].first + @checked_out << c + c + end + + def connections + @connections + end + + def remove_connection(conn) + @connections.delete conn + end + end + module ConnectionHandlerMethods def initialize(pools = {}) @connection_pools = pools @@ -180,7 +275,7 @@ module ActiveRecord end def establish_connection(name, spec) - @connection_pools[name] = ConnectionAdapters::ConnectionPool.create(spec) + @connection_pools[name] = ConnectionAdapters::AbstractConnectionPool.create(spec) end # for internal use only and for testing; @@ -238,15 +333,14 @@ module ActiveRecord pool.spec.config if pool end - private - def retrieve_connection_pool(klass) - loop do - pool = @connection_pools[klass.name] - return pool if pool - return nil if ActiveRecord::Base == klass - klass = klass.superclass - end + def retrieve_connection_pool(klass) + loop do + pool = @connection_pools[klass.name] + return pool if pool + return nil if ActiveRecord::Base == klass + klass = klass.superclass end + end end # This connection handler is not thread-safe, as it does not protect access diff --git a/activerecord/lib/active_record/connection_adapters/abstract/connection_specification.rb b/activerecord/lib/active_record/connection_adapters/abstract/connection_specification.rb index 0ebb191381..0910db1951 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/connection_specification.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/connection_specification.rb @@ -116,6 +116,10 @@ module ActiveRecord retrieve_connection end + def connection_pool + connection_handler.retrieve_connection_pool(self) + end + def retrieve_connection connection_handler.retrieve_connection(self) end diff --git a/activerecord/test/cases/threaded_connections_test.rb b/activerecord/test/cases/threaded_connections_test.rb index 3f88f79189..3abe9aea56 100644 --- a/activerecord/test/cases/threaded_connections_test.rb +++ b/activerecord/test/cases/threaded_connections_test.rb @@ -40,4 +40,40 @@ unless %w(FrontBase).include? ActiveRecord::Base.connection.adapter_name assert_equal @connections.length, 5 end end + + class PooledConnectionsTest < ActiveRecord::TestCase + def setup + @connection = ActiveRecord::Base.remove_connection + @connections = [] + @allow_concurrency = ActiveRecord::Base.allow_concurrency + ActiveRecord::Base.allow_concurrency = true + end + + def teardown + ActiveRecord::Base.clear_all_connections! + ActiveRecord::Base.allow_concurrency = @allow_concurrency + ActiveRecord::Base.establish_connection(@connection) + end + + def gather_connections + ActiveRecord::Base.establish_connection(@connection.merge({:pool => 2, :wait_timeout => 0.3})) + @timed_out = 0 + + 4.times do + Thread.new do + begin + @connections << ActiveRecord::Base.connection_pool.checkout + rescue ActiveRecord::ConnectionTimeoutError + @timed_out += 1 + end + end.join + end + end + + def test_threaded_connections + gather_connections + assert_equal @connections.length, 2 + assert_equal @timed_out, 2 + end + end end |