diff options
5 files changed, 286 insertions, 38 deletions
diff --git a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb index c259e46073..61ff603006 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb @@ -2,7 +2,6 @@ require 'thread' require 'monitor' require 'set' require 'active_support/core_ext/module/deprecation' -require 'timeout' module ActiveRecord # Raised when a connection could not be obtained within the connection @@ -70,6 +69,131 @@ module ActiveRecord # after which the Reaper will consider a connection reapable. (default # 5 seconds). class ConnectionPool + # Threadsafe, fair, FIFO queue. Meant to be used by ConnectionPool + # with which it shares a Monitor. But could be a generic Queue. + # + # The Queue in stdlib's 'thread' could replace this class except + # stdlib's doesn't support waiting with a timeout. + class Queue + def initialize(lock = Monitor.new) + @lock = lock + @cond = @lock.new_cond + @num_waiting = 0 + @queue = [] + end + + # Test if any threads are currently waiting on the queue. + def any_waiting? + synchronize do + @num_waiting > 0 + end + end + + # Return the number of threads currently waiting on this + # queue. + def num_waiting + synchronize do + @num_waiting + end + end + + # Add +element+ to the queue. Never blocks. + def add(element) + synchronize do + @queue.push element + @cond.signal + end + end + + # If +element+ is in the queue, remove and return it, or nil. + def delete(element) + synchronize do + @queue.delete(element) + end + end + + # Remove all elements from the queue. + def clear + synchronize do + @queue.clear + end + end + + # Remove the head of the queue. + # + # If +timeout+ is not given, remove and return the head the + # queue if the number of available elements is strictly + # greater than the number of threads currently waiting (that + # is, don't jump ahead in line). Otherwise, return nil. + # + # If +timeout+ is given, block if it there is no element + # available, waiting up to +timeout+ seconds for an element to + # become available. + # + # Raises: + # - ConnectionTimeoutError if +timeout+ is given and no element + # becomes available after +timeout+ seconds, + def poll(timeout = nil) + synchronize do + if timeout + no_wait_poll || wait_poll(timeout) + else + no_wait_poll + end + end + end + + private + + def synchronize(&block) + @lock.synchronize(&block) + end + + # Test if the queue currently contains any elements. + def any? + !@queue.empty? + end + + # A thread can remove an element from the queue without + # waiting if an only if the number of currently available + # connections is strictly greater than the number of waiting + # threads. + def can_remove_no_wait? + @queue.size > @num_waiting + end + + # Removes and returns the head of the queue if possible, or nil. + def remove + @queue.shift + end + + # Remove and return the head the queue if the number of + # available elements is strictly greater than the number of + # threads currently waiting. Otherwise, return nil. + def no_wait_poll + remove if can_remove_no_wait? + end + + # Waits on the queue up to +timeout+ seconds, then removes and + # returns the head of the queue. + def wait_poll(timeout) + @num_waiting += 1 + + t0 = Time.now + elapsed = 0 + loop do + @cond.wait(timeout - elapsed) + + return remove if any? + + elapsed = Time.now - t0 + raise ConnectionTimeoutError if elapsed >= timeout + end + ensure + @num_waiting -= 1 + end + end + # Every +frequency+ seconds, the reaper will call +reap+ on +pool+. # A reaper instantiated with a nil frequency will never reap the # connection pool. @@ -100,21 +224,6 @@ module ActiveRecord attr_accessor :automatic_reconnect, :checkout_timeout, :dead_connection_timeout attr_reader :spec, :connections, :size, :reaper - class Latch # :nodoc: - def initialize - @mutex = Mutex.new - @cond = ConditionVariable.new - end - - def release - @mutex.synchronize { @cond.broadcast } - end - - def await - @mutex.synchronize { @cond.wait @mutex } - end - end - # Creates a new ConnectionPool object. +spec+ is a ConnectionSpecification # object which describes database connection information (e.g. adapter, # host name, username, password, etc), as well as the maximum size for @@ -137,9 +246,18 @@ module ActiveRecord # default max pool size to 5 @size = (spec.config[:pool] && spec.config[:pool].to_i) || 5 - @latch = Latch.new @connections = [] @automatic_reconnect = true + + @available = Queue.new self + end + + # Hack for tests to be able to add connections. Do not call outside of tests + def insert_connection_for_test!(c) #:nodoc: + synchronize do + @connections << c + @available.add c + end end # Retrieve the connection associated with the current thread, or call @@ -197,6 +315,7 @@ module ActiveRecord conn.disconnect! end @connections = [] + @available.clear end end @@ -211,6 +330,10 @@ module ActiveRecord @connections.delete_if do |conn| conn.requires_reloading? end + @available.clear + @connections.each do |conn| + @available.add conn + end end end @@ -234,23 +357,10 @@ module ActiveRecord # Raises: # - PoolFullError: no connection can be obtained from the pool. def checkout - loop do - # Checkout an available connection - synchronize do - # Try to find a connection that hasn't been leased, and lease it - conn = connections.find { |c| c.lease } - - # If all connections were leased, and we have room to expand, - # create a new connection and lease it. - if !conn && connections.size < size - conn = checkout_new_connection - conn.lease - end - - return checkout_and_verify(conn) if conn - end - - Timeout.timeout(@checkout_timeout, PoolFullError) { @latch.await } + synchronize do + conn = acquire_connection + conn.lease + checkout_and_verify(conn) end end @@ -266,8 +376,9 @@ module ActiveRecord end release conn + + @available.add conn end - @latch.release end # Remove a connection from the connection pool. The connection will @@ -275,12 +386,14 @@ module ActiveRecord def remove(conn) synchronize do @connections.delete conn + @available.delete conn # FIXME: we might want to store the key on the connection so that removing # from the reserved hash will be a little easier. release conn + + @available.add checkout_new_connection if @available.any_waiting? end - @latch.release end # Removes dead connections from the pool. A dead connection can occur @@ -293,11 +406,35 @@ module ActiveRecord remove conn if conn.in_use? && stale > conn.last_use && !conn.active? end end - @latch.release end private + # Acquire a connection by one of 1) immediately removing one + # from the queue of available connections, 2) creating a new + # connection if the pool is not at capacity, 3) waiting on the + # queue for a connection to become available. + # + # Raises: + # - PoolFullError if a connection could not be acquired (FIXME: + # why not ConnectionTimeoutError? + def acquire_connection + if conn = @available.poll + conn + elsif @connections.size < @size + checkout_new_connection + else + t0 = Time.now + begin + @available.poll(@checkout_timeout) + rescue ConnectionTimeoutError + msg = 'could not obtain a database connection within %0.3f seconds (waited %0.3f seconds)' % + [@checkout_timeout, Time.now - t0] + raise PoolFullError, msg + end + end + end + def release(conn) thread_id = if @reserved_connections[current_connection_id] == conn current_connection_id diff --git a/activerecord/test/cases/associations/eager_test.rb b/activerecord/test/cases/associations/eager_test.rb index 76d3eb8946..58786e9011 100644 --- a/activerecord/test/cases/associations/eager_test.rb +++ b/activerecord/test/cases/associations/eager_test.rb @@ -962,6 +962,10 @@ class EagerAssociationTest < ActiveRecord::TestCase end def test_eager_loading_with_conditions_on_joined_table_preloads + # cache metadata in advance to avoid extra sql statements executed while testing + Tagging.first + Tag.first + posts = assert_queries(2) do Post.scoped(:select => 'distinct posts.*', :includes => :author, :joins => [:comments], :where => "comments.body like 'Thank you%'", :order => 'posts.id').all end diff --git a/activerecord/test/cases/associations/has_many_associations_test.rb b/activerecord/test/cases/associations/has_many_associations_test.rb index 0d8f311117..6eb71cb1e0 100644 --- a/activerecord/test/cases/associations/has_many_associations_test.rb +++ b/activerecord/test/cases/associations/has_many_associations_test.rb @@ -1353,6 +1353,9 @@ class HasManyAssociationsTest < ActiveRecord::TestCase author = Author.new(:name => "David") assert !author.essays.loaded? + # cache metadata in advance to avoid extra sql statements executed while testing + Essay.first + assert_queries 1 do assert_equal 1, author.essays.size end diff --git a/activerecord/test/cases/connection_adapters/abstract_adapter_test.rb b/activerecord/test/cases/connection_adapters/abstract_adapter_test.rb index 7dc6e8afcb..3e3d6e2769 100644 --- a/activerecord/test/cases/connection_adapters/abstract_adapter_test.rb +++ b/activerecord/test/cases/connection_adapters/abstract_adapter_test.rb @@ -36,7 +36,7 @@ module ActiveRecord def test_close pool = ConnectionPool.new(ConnectionSpecification.new({}, nil)) - pool.connections << adapter + pool.insert_connection_for_test! adapter adapter.pool = pool # Make sure the pool marks the connection in use diff --git a/activerecord/test/cases/connection_pool_test.rb b/activerecord/test/cases/connection_pool_test.rb index bba7815d73..aa1f5e6bad 100644 --- a/activerecord/test/cases/connection_pool_test.rb +++ b/activerecord/test/cases/connection_pool_test.rb @@ -200,6 +200,110 @@ module ActiveRecord end.join end + # The connection pool is "fair" if threads waiting for + # connections receive them the order in which they began + # waiting. This ensures that we don't timeout one HTTP request + # even while well under capacity in a multi-threaded environment + # such as a Java servlet container. + # + # We don't need strict fairness: if two connections become + # available at the same time, it's fine of two threads that were + # waiting acquire the connections out of order. + # + # Thus this test prepares waiting threads and then trickles in + # available connections slowly, ensuring the wakeup order is + # correct in this case. + def test_checkout_fairness + @pool.instance_variable_set(:@size, 10) + expected = (1..@pool.size).to_a.freeze + # check out all connections so our threads start out waiting + conns = expected.map { @pool.checkout } + mutex = Mutex.new + order = [] + errors = [] + + threads = expected.map do |i| + t = Thread.new { + begin + conn = @pool.checkout # never checked back in + mutex.synchronize { order << i } + rescue => e + mutex.synchronize { errors << e } + end + } + Thread.pass until t.status == "sleep" + t + end + + # this should wake up the waiting threads one by one in order + conns.each { |conn| @pool.checkin(conn); sleep 0.1 } + + threads.each(&:join) + + raise errors.first if errors.any? + + assert_equal(expected, order) + end + + # As mentioned in #test_checkout_fairness, we don't care about + # strict fairness. This test creates two groups of threads: + # group1 whose members all start waiting before any thread in + # group2. Enough connections are checked in to wakeup all + # group1 threads, and the fact that only group1 and no group2 + # threads acquired a connection is enforced. + def test_checkout_fairness_by_group + @pool.instance_variable_set(:@size, 10) + # take all the connections + conns = (1..10).map { @pool.checkout } + mutex = Mutex.new + successes = [] # threads that successfully got a connection + errors = [] + + make_thread = proc do |i| + t = Thread.new { + begin + conn = @pool.checkout # never checked back in + mutex.synchronize { successes << i } + rescue => e + mutex.synchronize { errors << e } + end + } + Thread.pass until t.status == "sleep" + t + end + + # all group1 threads start waiting before any in group2 + group1 = (1..5).map(&make_thread) + group2 = (6..10).map(&make_thread) + + # checkin n connections back to the pool + checkin = proc do |n| + n.times do + c = conns.pop + @pool.checkin(c) + end + end + + checkin.call(group1.size) # should wake up all group1 + + loop do + sleep 0.1 + break if mutex.synchronize { (successes.size + errors.size) == group1.size } + end + + winners = mutex.synchronize { successes.dup } + checkin.call(group2.size) # should wake up everyone remaining + + group1.each(&:join) + group2.each(&:join) + + assert_equal((1..group1.size).to_a, winners.sort) + + if errors.any? + raise errors.first + end + end + def test_automatic_reconnect= pool = ConnectionPool.new ActiveRecord::Base.connection_pool.spec assert pool.automatic_reconnect |