diff options
Diffstat (limited to 'activerecord')
6 files changed, 255 insertions, 70 deletions
diff --git a/activerecord/CHANGELOG.md b/activerecord/CHANGELOG.md index 2c8ec3d4d1..c4feabfb5f 100644 --- a/activerecord/CHANGELOG.md +++ b/activerecord/CHANGELOG.md @@ -1,5 +1,8 @@ ## Rails 4.0.0 (unreleased) ## +* Connections *must* be closed at the end of a thread. If not, your + connection pool can fill and an exception will be raised. + * Added the `ActiveRecord::Model` module which can be included in a class as an alternative to inheriting from `ActiveRecord::Base`: diff --git a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb index ea738cb305..b8f99adc22 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb @@ -9,6 +9,13 @@ module ActiveRecord class ConnectionTimeoutError < ConnectionNotEstablished end + # Raised when a connection pool is full and another connection is requested + class PoolFullError < ConnectionNotEstablished + def initialize size, timeout + super("Connection pool of size #{size} and timeout #{timeout}s is full") + end + end + module ConnectionAdapters # Connection pool base class for managing Active Record database # connections. @@ -57,10 +64,35 @@ module ActiveRecord # * +wait_timeout+: number of seconds to block and wait for a connection # before giving up and raising a timeout error (default 5 seconds). class ConnectionPool + # Every +frequency+ seconds, the reaper will call +reap+ on +pool+. + # A reaper instantiated with a nil frequency will never reap the + # connection pool. + # + # Configure the frequency by setting "reaping_frequency" in your + # database yaml file. + class Reaper + attr_reader :pool, :frequency + + def initialize(pool, frequency) + @pool = pool + @frequency = frequency + end + + def run + return unless frequency + Thread.new(frequency, pool) { |t, p| + while true + sleep t + p.reap + end + } + end + end + include MonitorMixin - attr_accessor :automatic_reconnect - attr_reader :spec, :connections + attr_accessor :automatic_reconnect, :timeout + attr_reader :spec, :connections, :size, :reaper # Creates a new ConnectionPool object. +spec+ is a ConnectionSpecification # object which describes database connection information (e.g. adapter, @@ -76,8 +108,9 @@ module ActiveRecord # The cache of reserved connections mapped to threads @reserved_connections = {} - @queue = new_cond @timeout = spec.config[:wait_timeout] || 5 + @reaper = Reaper.new self, spec.config[:reaping_frequency] + @reaper.run # default max pool size to 5 @size = (spec.config[:pool] && spec.config[:pool].to_i) || 5 @@ -155,76 +188,47 @@ module ActiveRecord # associated with stale threads. def verify_active_connections! #:nodoc: synchronize do - clear_stale_cached_connections! @connections.each do |connection| connection.verify! end end end - # Return any checked-out connections back to the pool by threads that - # are no longer alive. - def clear_stale_cached_connections! - keys = @reserved_connections.keys - Thread.list.find_all { |t| - t.alive? - }.map { |thread| thread.object_id } - keys.each do |key| - conn = @reserved_connections[key] - ActiveSupport::Deprecation.warn(<<-eowarn) if conn.in_use? -Database connections will not be closed automatically, please close your -database connection at the end of the thread by calling `close` on your -connection. For example: ActiveRecord::Base.connection.close - eowarn - checkin conn - @reserved_connections.delete(key) - end + def clear_stale_cached_connections! # :nodoc: end + deprecate :clear_stale_cached_connections! # Check-out a database connection from the pool, indicating that you want # to use it. You should call #checkin when you no longer need this. # - # This is done by either returning an existing connection, or by creating - # a new connection. If the maximum number of connections for this pool has - # already been reached, but the pool is empty (i.e. they're all being used), - # then this method will wait until a thread has checked in a connection. - # The wait time is bounded however: if no connection can be checked out - # within the timeout specified for this pool, then a ConnectionTimeoutError - # exception will be raised. + # This is done by either returning and leasing existing connection, or by + # creating a new connection and leasing it. + # + # If all connections are leased and the pool is at capacity (meaning the + # number of currently leased connections is greater than or equal to the + # size limit set), an ActiveRecord::PoolFullError exception will be raised. # # Returns: an AbstractAdapter object. # # Raises: - # - ConnectionTimeoutError: no connection can be obtained from the pool - # within the timeout period. + # - PoolFullError: no connection can be obtained from the pool. def checkout # Checkout an available connection synchronize do - loop do - conn = @connections.find { |c| c.lease } - - unless conn - if @connections.size < @size - conn = checkout_new_connection - conn.lease - end - end - - if conn - checkout_and_verify conn - return conn - end - - @queue.wait(@timeout) - - if(active_connections.size < @connections.size) - next - else - clear_stale_cached_connections! - if @size == active_connections.size - raise ConnectionTimeoutError, "could not obtain a database connection#{" within #{@timeout} seconds" if @timeout}. The max pool size is currently #{@size}; consider increasing it." - end - end + # Try to find a connection that hasn't been leased, and lease it + conn = connections.find { |c| c.lease } + + # If all connections were leased, and we have room to expand, + # create a new connection and lease it. + if !conn && connections.size < size + conn = checkout_new_connection + conn.lease + end + if conn + checkout_and_verify conn + else + raise PoolFullError.new(size, timeout) end end end @@ -238,7 +242,33 @@ connection. For example: ActiveRecord::Base.connection.close synchronize do conn.run_callbacks :checkin do conn.expire - @queue.signal + end + end + end + + # Remove a connection from the connection pool. The connection will + # remain open and active but will no longer be managed by this pool. + def remove(conn) + synchronize do + @connections.delete conn + + # FIXME: we might want to store the key on the connection so that removing + # from the reserved hash will be a little easier. + thread_id = @reserved_connections.keys.find { |k| + @reserved_connections[k] == conn + } + @reserved_connections.delete thread_id if thread_id + end + end + + # Removes dead connections from the pool. A dead connection can occur + # if a programmer forgets to close a connection at the end of a thread + # or a thread dies unexpectedly. + def reap + synchronize do + stale = Time.now - @timeout + connections.dup.each do |conn| + remove conn if conn.in_use? && stale > conn.last_use && !conn.active? end end end diff --git a/activerecord/lib/active_record/connection_adapters/connection_specification.rb b/activerecord/lib/active_record/connection_adapters/connection_specification.rb index 3e67c3a2b7..8491d42b86 100644 --- a/activerecord/lib/active_record/connection_adapters/connection_specification.rb +++ b/activerecord/lib/active_record/connection_adapters/connection_specification.rb @@ -3,10 +3,14 @@ module ActiveRecord class ConnectionSpecification #:nodoc: attr_reader :config, :adapter_method - def initialize (config, adapter_method) + def initialize(config, adapter_method) @config, @adapter_method = config, adapter_method end + def initialize_dup(original) + @config = original.config.dup + end + ## # Builds a ConnectionSpecification from user input class Resolver # :nodoc: diff --git a/activerecord/test/cases/connection_adapters/connection_specification_test.rb b/activerecord/test/cases/connection_adapters/connection_specification_test.rb new file mode 100644 index 0000000000..ea2196cda2 --- /dev/null +++ b/activerecord/test/cases/connection_adapters/connection_specification_test.rb @@ -0,0 +1,12 @@ +require "cases/helper" + +module ActiveRecord + module ConnectionAdapters + class ConnectionSpecificationTest < ActiveRecord::TestCase + def test_dup_deep_copy_config + spec = ConnectionSpecification.new({ :a => :b }, "bar") + assert_not_equal(spec.config.object_id, spec.dup.config.object_id) + end + end + end +end diff --git a/activerecord/test/cases/connection_pool_test.rb b/activerecord/test/cases/connection_pool_test.rb index d170a13b23..26842d3998 100644 --- a/activerecord/test/cases/connection_pool_test.rb +++ b/activerecord/test/cases/connection_pool_test.rb @@ -4,6 +4,8 @@ module ActiveRecord module ConnectionAdapters class ConnectionPoolTest < ActiveRecord::TestCase def setup + super + # Keep a duplicate pool so we do not bother others @pool = ConnectionPool.new ActiveRecord::Base.connection_pool.spec @@ -18,6 +20,70 @@ module ActiveRecord end end + def teardown + super + @pool.connections.each(&:close) + end + + def test_full_pool_exception + assert_raises(PoolFullError) do + (@pool.size + 1).times do + @pool.checkout + end + end + end + + def test_reap_and_active + @pool.checkout + @pool.checkout + @pool.checkout + @pool.timeout = 0 + + connections = @pool.connections.dup + + @pool.reap + + assert_equal connections.length, @pool.connections.length + end + + def test_reap_inactive + @pool.checkout + @pool.checkout + @pool.checkout + @pool.timeout = 0 + + connections = @pool.connections.dup + connections.each do |conn| + conn.extend(Module.new { def active?; false; end; }) + end + + @pool.reap + + assert_equal 0, @pool.connections.length + ensure + connections.each(&:close) + end + + def test_remove_connection + conn = @pool.checkout + assert conn.in_use? + + length = @pool.connections.length + @pool.remove conn + assert conn.in_use? + assert_equal(length - 1, @pool.connections.length) + ensure + conn.close + end + + def test_remove_connection_for_thread + conn = @pool.connection + @pool.remove conn + assert_not_equal(conn, @pool.connection) + ensure + conn.close + end + def test_active_connection? assert !@pool.active_connection? assert @pool.connection @@ -35,27 +101,16 @@ module ActiveRecord threads << Thread.new(i) do |pool_count| connection = pool.connection assert_not_nil connection + connection.close end end - threads.each {|t| t.join} + threads.each(&:join) Thread.new do - threads.each do |t| - thread_ids = pool.instance_variable_get(:@reserved_connections).keys - assert thread_ids.include?(t.object_id) - end - - assert_deprecated do - pool.connection - end - threads.each do |t| - thread_ids = pool.instance_variable_get(:@reserved_connections).keys - assert !thread_ids.include?(t.object_id) - end + assert pool.connection pool.connection.close end.join - end def test_automatic_reconnect= diff --git a/activerecord/test/cases/reaper_test.rb b/activerecord/test/cases/reaper_test.rb new file mode 100644 index 0000000000..576ab60090 --- /dev/null +++ b/activerecord/test/cases/reaper_test.rb @@ -0,0 +1,81 @@ +require "cases/helper" + +module ActiveRecord + module ConnectionAdapters + class ReaperTest < ActiveRecord::TestCase + attr_reader :pool + + def setup + super + @pool = ConnectionPool.new ActiveRecord::Base.connection_pool.spec + end + + def teardown + super + @pool.connections.each(&:close) + end + + class FakePool + attr_reader :reaped + + def initialize + @reaped = false + end + + def reap + @reaped = true + end + end + + # A reaper with nil time should never reap connections + def test_nil_time + fp = FakePool.new + assert !fp.reaped + reaper = ConnectionPool::Reaper.new(fp, nil) + reaper.run + assert !fp.reaped + end + + def test_some_time + fp = FakePool.new + assert !fp.reaped + + reaper = ConnectionPool::Reaper.new(fp, 0.0001) + reaper.run + until fp.reaped + Thread.pass + end + assert fp.reaped + end + + def test_pool_has_reaper + assert pool.reaper + end + + def test_reaping_frequency_configuration + spec = ActiveRecord::Base.connection_pool.spec.dup + spec.config[:reaping_frequency] = 100 + pool = ConnectionPool.new spec + assert_equal 100, pool.reaper.frequency + end + + def test_connection_pool_starts_reaper + spec = ActiveRecord::Base.connection_pool.spec.dup + spec.config[:reaping_frequency] = 0.0001 + + pool = ConnectionPool.new spec + pool.timeout = 0 + + conn = pool.checkout + count = pool.connections.length + + conn.extend(Module.new { def active?; false; end; }) + + while count == pool.connections.length + Thread.pass + end + assert_equal(count - 1, pool.connections.length) + end + end + end +end |