From 0693e079708a52b777f2b7872b8e3d467b880a0d Mon Sep 17 00:00:00 2001 From: Jonathan Rochkind Date: Mon, 17 Sep 2012 17:06:14 -0400 Subject: backport fair connection pool 02b2335563 to 3-2-stable --- activerecord/CHANGELOG.md | 5 + .../abstract/connection_pool.rb | 239 ++++++++++++++++----- .../connection_adapters/abstract_adapter_test.rb | 2 +- activerecord/test/cases/connection_pool_test.rb | 104 +++++++++ 4 files changed, 300 insertions(+), 50 deletions(-) diff --git a/activerecord/CHANGELOG.md b/activerecord/CHANGELOG.md index b3c1b034d7..5133438e32 100644 --- a/activerecord/CHANGELOG.md +++ b/activerecord/CHANGELOG.md @@ -1,5 +1,10 @@ ## Rails 3.2.9 (unreleased) +* Make ActiveRecord::ConnectionPool 'fair', first thread waiting is + first thread given newly available connection. Backport of #6492 02b2335563 + + *jrochkind* + * Fix creation of through association models when using `collection=[]` on a `has_many :through` association from an unsaved model. Fix #7661. diff --git a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb index d4649102df..235efc12f6 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb @@ -57,10 +57,141 @@ module ActiveRecord # * +wait_timeout+: number of seconds to block and wait for a connection # before giving up and raising a timeout error (default 5 seconds). class ConnectionPool + + # Threadsafe, fair, FIFO queue. Meant to be used by ConnectionPool + # with which it shares a Monitor. But could be a generic Queue. + # + # The Queue in stdlib's 'thread' could replace this class except + # stdlib's doesn't support waiting with a timeout. + class Queue + def initialize(lock = Monitor.new) + @lock = lock + @cond = @lock.new_cond + @num_waiting = 0 + @queue = [] + end + + # Test if any threads are currently waiting on the queue. + def any_waiting? + synchronize do + @num_waiting > 0 + end + end + + # Return the number of threads currently waiting on this + # queue. + def num_waiting + synchronize do + @num_waiting + end + end + + # Add +element+ to the queue. Never blocks. + def add(element) + synchronize do + @queue.push element + @cond.signal + end + end + + # If +element+ is in the queue, remove and return it, or nil. + def delete(element) + synchronize do + @queue.delete(element) + end + end + + # Remove all elements from the queue. + def clear + synchronize do + @queue.clear + end + end + + # Remove the head of the queue. + # + # If +timeout+ is not given, remove and return the head the + # queue if the number of available elements is strictly + # greater than the number of threads currently waiting (that + # is, don't jump ahead in line). Otherwise, return nil. + # + # If +timeout+ is given, block if it there is no element + # available, waiting up to +timeout+ seconds for an element to + # become available. + # + # Raises: + # - ConnectionTimeoutError if +timeout+ is given and no element + # becomes available after +timeout+ seconds, + def poll(timeout = nil) + synchronize do + if timeout + no_wait_poll || wait_poll(timeout) + else + no_wait_poll + end + end + end + + private + + def synchronize(&block) + @lock.synchronize(&block) + end + + # Test if the queue currently contains any elements. + def any? + !@queue.empty? + end + + # A thread can remove an element from the queue without + # waiting if an only if the number of currently available + # connections is strictly greater than the number of waiting + # threads. + def can_remove_no_wait? + @queue.size > @num_waiting + end + + # Removes and returns the head of the queue if possible, or nil. + def remove + @queue.shift + end + + # Remove and return the head the queue if the number of + # available elements is strictly greater than the number of + # threads currently waiting. Otherwise, return nil. + def no_wait_poll + remove if can_remove_no_wait? + end + + # Waits on the queue up to +timeout+ seconds, then removes and + # returns the head of the queue. + def wait_poll(timeout) + @num_waiting += 1 + + t0 = Time.now + elapsed = 0 + loop do + @cond.wait(timeout - elapsed) + + return remove if any? + + elapsed = Time.now - t0 + + if elapsed >= timeout + msg = 'could not obtain a database connection within %0.3f seconds (waited %0.3f seconds)' % + [timeout, elapsed] + raise ConnectionTimeoutError, msg + end + end + ensure + @num_waiting -= 1 + end + end + include MonitorMixin attr_accessor :automatic_reconnect - attr_reader :spec, :connections + attr_reader :spec, :connections, :size # Creates a new ConnectionPool object. +spec+ is a ConnectionSpecification # object which describes database connection information (e.g. adapter, @@ -76,7 +207,6 @@ module ActiveRecord # The cache of reserved connections mapped to threads @reserved_connections = {} - @queue = new_cond @timeout = spec.config[:wait_timeout] || 5 # default max pool size to 5 @@ -84,6 +214,16 @@ module ActiveRecord @connections = [] @automatic_reconnect = true + + @available = Queue.new self + end + + # Hack for tests to be able to add connections. Do not call outside of tests + def insert_connection_for_test!(c) #:nodoc: + synchronize do + @connections << c + @available.add c + end end # Retrieve the connection associated with the current thread, or call @@ -139,6 +279,7 @@ module ActiveRecord conn.disconnect! end @connections = [] + @available.clear end end @@ -150,9 +291,15 @@ module ActiveRecord checkin conn conn.disconnect! if conn.requires_reloading? end + @connections.delete_if do |conn| conn.requires_reloading? end + @available.clear + @connections.each do |conn| + @available.add conn + end + end end @@ -216,58 +363,22 @@ connection. For example: ActiveRecord::Base.connection.close # Check-out a database connection from the pool, indicating that you want # to use it. You should call #checkin when you no longer need this. # - # This is done by either returning an existing connection, or by creating - # a new connection. If the maximum number of connections for this pool has - # already been reached, but the pool is empty (i.e. they're all being used), - # then this method will wait until a thread has checked in a connection. - # The wait time is bounded however: if no connection can be checked out - # within the timeout specified for this pool, then a ConnectionTimeoutError - # exception will be raised. + # This is done by either returning and leasing existing connection, or by + # creating a new connection and leasing it. + # + # If all connections are leased and the pool is at capacity (meaning the + # number of currently leased connections is greater than or equal to the + # size limit set), an ActiveRecord::PoolFullError exception will be raised. # # Returns: an AbstractAdapter object. # # Raises: - # - ConnectionTimeoutError: no connection can be obtained from the pool - # within the timeout period. + # - PoolFullError: no connection can be obtained from the pool. def checkout synchronize do - waited_time = 0 - - loop do - conn = @connections.find { |c| c.lease } - - unless conn - if @connections.size < @size - conn = checkout_new_connection - conn.lease - end - end - - if conn - checkout_and_verify conn - return conn - end - - if waited_time >= @timeout - raise ConnectionTimeoutError, "could not obtain a database connection#{" within #{@timeout} seconds" if @timeout} (waited #{waited_time} seconds). The max pool size is currently #{@size}; consider increasing it." - end - - # Sometimes our wait can end because a connection is available, - # but another thread can snatch it up first. If timeout hasn't - # passed but no connection is avail, looks like that happened -- - # loop and wait again, for the time remaining on our timeout. - before_wait = Time.now - @queue.wait( [@timeout - waited_time, 0].max ) - waited_time += (Time.now - before_wait) - - # Will go away in Rails 4, when we don't clean up - # after leaked connections automatically anymore. Right now, clean - # up after we've returned from a 'wait' if it looks like it's - # needed, then loop and try again. - if(active_connections.size >= @connections.size) - clear_stale_cached_connections! - end - end + conn = acquire_connection + conn.lease + checkout_and_verify(conn) end end @@ -280,10 +391,40 @@ connection. For example: ActiveRecord::Base.connection.close synchronize do conn.run_callbacks :checkin do conn.expire - @queue.signal end release conn + + @available.add conn + end + end + + # Acquire a connection by one of 1) immediately removing one + # from the queue of available connections, 2) creating a new + # connection if the pool is not at capacity, 3) waiting on the + # queue for a connection to become available (first calling + # clear_stale_cached_connections! to clean up leaked connections, + # this cleanup will prob be going away in Rails4). + # + # Raises: + # - ConnectionTimeoutError if a connection could not be acquired + def acquire_connection + if conn = @available.poll + conn + elsif @connections.size < @size + checkout_new_connection + else + # this conditional clear_stale will go away in Rails 4, when we don't + # clean up after leaked connections automatically anymore. Right now, + # clean up after we've returned from a 'wait' if it looks like it's + # needed before trying to wait for a connection. + synchronize do + if(active_connections.size >= @connections.size) + clear_stale_cached_connections! + end + end + + @available.poll(@timeout) end end diff --git a/activerecord/test/cases/connection_adapters/abstract_adapter_test.rb b/activerecord/test/cases/connection_adapters/abstract_adapter_test.rb index 7af9079b48..3dcde699fc 100644 --- a/activerecord/test/cases/connection_adapters/abstract_adapter_test.rb +++ b/activerecord/test/cases/connection_adapters/abstract_adapter_test.rb @@ -36,7 +36,7 @@ module ActiveRecord def test_close pool = ConnectionPool.new(Base::ConnectionSpecification.new({}, nil)) - pool.connections << adapter + pool.insert_connection_for_test! adapter adapter.pool = pool # Make sure the pool marks the connection in use diff --git a/activerecord/test/cases/connection_pool_test.rb b/activerecord/test/cases/connection_pool_test.rb index 5cecfa90e7..2ada2c1fe1 100644 --- a/activerecord/test/cases/connection_pool_test.rb +++ b/activerecord/test/cases/connection_pool_test.rb @@ -128,6 +128,110 @@ module ActiveRecord end + # The connection pool is "fair" if threads waiting for + # connections receive them the order in which they began + # waiting. This ensures that we don't timeout one HTTP request + # even while well under capacity in a multi-threaded environment + # such as a Java servlet container. + # + # We don't need strict fairness: if two connections become + # available at the same time, it's fine of two threads that were + # waiting acquire the connections out of order. + # + # Thus this test prepares waiting threads and then trickles in + # available connections slowly, ensuring the wakeup order is + # correct in this case. + def test_checkout_fairness + @pool.instance_variable_set(:@size, 10) + expected = (1..@pool.size).to_a.freeze + # check out all connections so our threads start out waiting + conns = expected.map { @pool.checkout } + mutex = Mutex.new + order = [] + errors = [] + + threads = expected.map do |i| + t = Thread.new { + begin + @pool.checkout # connection return value never checked back in + mutex.synchronize { order << i } + rescue => e + mutex.synchronize { errors << e } + end + } + Thread.pass until t.status == "sleep" + t + end + + # this should wake up the waiting threads one by one in order + conns.each { |conn| @pool.checkin(conn); sleep 0.1 } + + threads.each(&:join) + + raise errors.first if errors.any? + + assert_equal(expected, order) + end + + # As mentioned in #test_checkout_fairness, we don't care about + # strict fairness. This test creates two groups of threads: + # group1 whose members all start waiting before any thread in + # group2. Enough connections are checked in to wakeup all + # group1 threads, and the fact that only group1 and no group2 + # threads acquired a connection is enforced. + def test_checkout_fairness_by_group + @pool.instance_variable_set(:@size, 10) + # take all the connections + conns = (1..10).map { @pool.checkout } + mutex = Mutex.new + successes = [] # threads that successfully got a connection + errors = [] + + make_thread = proc do |i| + t = Thread.new { + begin + @pool.checkout # connection return value never checked back in + mutex.synchronize { successes << i } + rescue => e + mutex.synchronize { errors << e } + end + } + Thread.pass until t.status == "sleep" + t + end + + # all group1 threads start waiting before any in group2 + group1 = (1..5).map(&make_thread) + group2 = (6..10).map(&make_thread) + + # checkin n connections back to the pool + checkin = proc do |n| + n.times do + c = conns.pop + @pool.checkin(c) + end + end + + checkin.call(group1.size) # should wake up all group1 + + loop do + sleep 0.1 + break if mutex.synchronize { (successes.size + errors.size) == group1.size } + end + + winners = mutex.synchronize { successes.dup } + checkin.call(group2.size) # should wake up everyone remaining + + group1.each(&:join) + group2.each(&:join) + + assert_equal((1..group1.size).to_a, winners.sort) + + if errors.any? + raise errors.first + end + end + def test_automatic_reconnect= pool = ConnectionPool.new ActiveRecord::Base.connection_pool.spec assert pool.automatic_reconnect -- cgit v1.2.3