diff options
author | Rafael Mendonça França <rafaelmfranca@gmail.com> | 2012-09-20 19:04:37 -0300 |
---|---|---|
committer | Rafael Mendonça França <rafaelmfranca@gmail.com> | 2012-09-20 19:06:20 -0300 |
commit | e4018a00fdfb6b0586c42d4a25a005b5ca416173 (patch) | |
tree | ccfd1f364d8827972c200473e21a34e035fb6fa9 /activerecord | |
parent | 50a76c14f95373b6e723e8603f43f1cf6af5b4c7 (diff) | |
download | rails-e4018a00fdfb6b0586c42d4a25a005b5ca416173.tar.gz rails-e4018a00fdfb6b0586c42d4a25a005b5ca416173.tar.bz2 rails-e4018a00fdfb6b0586c42d4a25a005b5ca416173.zip |
Revert "backport fair connection pool 02b2335563 to 3-2-stable"
This reverts commit 0693e079708a52b777f2b7872b8e3d467b880a0d.
Revert "Cache columns metadata to avoid extra while testing"
This reverts commit a82f1e3f5d11c8dfba9f4c911745ec40a7965216.
Reason: This is causing failures in the postgresql build.
See http://travis-ci.org/#!/rails/rails/builds/2485584
Related with #7675
Diffstat (limited to 'activerecord')
7 files changed, 51 insertions, 311 deletions
diff --git a/activerecord/CHANGELOG.md b/activerecord/CHANGELOG.md index 5133438e32..b3c1b034d7 100644 --- a/activerecord/CHANGELOG.md +++ b/activerecord/CHANGELOG.md @@ -1,10 +1,5 @@ ## Rails 3.2.9 (unreleased) -* Make ActiveRecord::ConnectionPool 'fair', first thread waiting is - first thread given newly available connection. Backport of #6492 02b2335563 - - *jrochkind* - * Fix creation of through association models when using `collection=[]` on a `has_many :through` association from an unsaved model. Fix #7661. diff --git a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb index 235efc12f6..d4649102df 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb @@ -57,141 +57,10 @@ module ActiveRecord # * +wait_timeout+: number of seconds to block and wait for a connection # before giving up and raising a timeout error (default 5 seconds). class ConnectionPool - - # Threadsafe, fair, FIFO queue. Meant to be used by ConnectionPool - # with which it shares a Monitor. But could be a generic Queue. - # - # The Queue in stdlib's 'thread' could replace this class except - # stdlib's doesn't support waiting with a timeout. - class Queue - def initialize(lock = Monitor.new) - @lock = lock - @cond = @lock.new_cond - @num_waiting = 0 - @queue = [] - end - - # Test if any threads are currently waiting on the queue. - def any_waiting? - synchronize do - @num_waiting > 0 - end - end - - # Return the number of threads currently waiting on this - # queue. - def num_waiting - synchronize do - @num_waiting - end - end - - # Add +element+ to the queue. Never blocks. - def add(element) - synchronize do - @queue.push element - @cond.signal - end - end - - # If +element+ is in the queue, remove and return it, or nil. - def delete(element) - synchronize do - @queue.delete(element) - end - end - - # Remove all elements from the queue. - def clear - synchronize do - @queue.clear - end - end - - # Remove the head of the queue. - # - # If +timeout+ is not given, remove and return the head the - # queue if the number of available elements is strictly - # greater than the number of threads currently waiting (that - # is, don't jump ahead in line). Otherwise, return nil. - # - # If +timeout+ is given, block if it there is no element - # available, waiting up to +timeout+ seconds for an element to - # become available. - # - # Raises: - # - ConnectionTimeoutError if +timeout+ is given and no element - # becomes available after +timeout+ seconds, - def poll(timeout = nil) - synchronize do - if timeout - no_wait_poll || wait_poll(timeout) - else - no_wait_poll - end - end - end - - private - - def synchronize(&block) - @lock.synchronize(&block) - end - - # Test if the queue currently contains any elements. - def any? - !@queue.empty? - end - - # A thread can remove an element from the queue without - # waiting if an only if the number of currently available - # connections is strictly greater than the number of waiting - # threads. - def can_remove_no_wait? - @queue.size > @num_waiting - end - - # Removes and returns the head of the queue if possible, or nil. - def remove - @queue.shift - end - - # Remove and return the head the queue if the number of - # available elements is strictly greater than the number of - # threads currently waiting. Otherwise, return nil. - def no_wait_poll - remove if can_remove_no_wait? - end - - # Waits on the queue up to +timeout+ seconds, then removes and - # returns the head of the queue. - def wait_poll(timeout) - @num_waiting += 1 - - t0 = Time.now - elapsed = 0 - loop do - @cond.wait(timeout - elapsed) - - return remove if any? - - elapsed = Time.now - t0 - - if elapsed >= timeout - msg = 'could not obtain a database connection within %0.3f seconds (waited %0.3f seconds)' % - [timeout, elapsed] - raise ConnectionTimeoutError, msg - end - end - ensure - @num_waiting -= 1 - end - end - include MonitorMixin attr_accessor :automatic_reconnect - attr_reader :spec, :connections, :size + attr_reader :spec, :connections # Creates a new ConnectionPool object. +spec+ is a ConnectionSpecification # object which describes database connection information (e.g. adapter, @@ -207,6 +76,7 @@ module ActiveRecord # The cache of reserved connections mapped to threads @reserved_connections = {} + @queue = new_cond @timeout = spec.config[:wait_timeout] || 5 # default max pool size to 5 @@ -214,16 +84,6 @@ module ActiveRecord @connections = [] @automatic_reconnect = true - - @available = Queue.new self - end - - # Hack for tests to be able to add connections. Do not call outside of tests - def insert_connection_for_test!(c) #:nodoc: - synchronize do - @connections << c - @available.add c - end end # Retrieve the connection associated with the current thread, or call @@ -279,7 +139,6 @@ module ActiveRecord conn.disconnect! end @connections = [] - @available.clear end end @@ -291,15 +150,9 @@ module ActiveRecord checkin conn conn.disconnect! if conn.requires_reloading? end - @connections.delete_if do |conn| conn.requires_reloading? end - @available.clear - @connections.each do |conn| - @available.add conn - end - end end @@ -363,22 +216,58 @@ connection. For example: ActiveRecord::Base.connection.close # Check-out a database connection from the pool, indicating that you want # to use it. You should call #checkin when you no longer need this. # - # This is done by either returning and leasing existing connection, or by - # creating a new connection and leasing it. - # - # If all connections are leased and the pool is at capacity (meaning the - # number of currently leased connections is greater than or equal to the - # size limit set), an ActiveRecord::PoolFullError exception will be raised. + # This is done by either returning an existing connection, or by creating + # a new connection. If the maximum number of connections for this pool has + # already been reached, but the pool is empty (i.e. they're all being used), + # then this method will wait until a thread has checked in a connection. + # The wait time is bounded however: if no connection can be checked out + # within the timeout specified for this pool, then a ConnectionTimeoutError + # exception will be raised. # # Returns: an AbstractAdapter object. # # Raises: - # - PoolFullError: no connection can be obtained from the pool. + # - ConnectionTimeoutError: no connection can be obtained from the pool + # within the timeout period. def checkout synchronize do - conn = acquire_connection - conn.lease - checkout_and_verify(conn) + waited_time = 0 + + loop do + conn = @connections.find { |c| c.lease } + + unless conn + if @connections.size < @size + conn = checkout_new_connection + conn.lease + end + end + + if conn + checkout_and_verify conn + return conn + end + + if waited_time >= @timeout + raise ConnectionTimeoutError, "could not obtain a database connection#{" within #{@timeout} seconds" if @timeout} (waited #{waited_time} seconds). The max pool size is currently #{@size}; consider increasing it." + end + + # Sometimes our wait can end because a connection is available, + # but another thread can snatch it up first. If timeout hasn't + # passed but no connection is avail, looks like that happened -- + # loop and wait again, for the time remaining on our timeout. + before_wait = Time.now + @queue.wait( [@timeout - waited_time, 0].max ) + waited_time += (Time.now - before_wait) + + # Will go away in Rails 4, when we don't clean up + # after leaked connections automatically anymore. Right now, clean + # up after we've returned from a 'wait' if it looks like it's + # needed, then loop and try again. + if(active_connections.size >= @connections.size) + clear_stale_cached_connections! + end + end end end @@ -391,40 +280,10 @@ connection. For example: ActiveRecord::Base.connection.close synchronize do conn.run_callbacks :checkin do conn.expire + @queue.signal end release conn - - @available.add conn - end - end - - # Acquire a connection by one of 1) immediately removing one - # from the queue of available connections, 2) creating a new - # connection if the pool is not at capacity, 3) waiting on the - # queue for a connection to become available (first calling - # clear_stale_cached_connections! to clean up leaked connections, - # this cleanup will prob be going away in Rails4). - # - # Raises: - # - ConnectionTimeoutError if a connection could not be acquired - def acquire_connection - if conn = @available.poll - conn - elsif @connections.size < @size - checkout_new_connection - else - # this conditional clear_stale will go away in Rails 4, when we don't - # clean up after leaked connections automatically anymore. Right now, - # clean up after we've returned from a 'wait' if it looks like it's - # needed before trying to wait for a connection. - synchronize do - if(active_connections.size >= @connections.size) - clear_stale_cached_connections! - end - end - - @available.poll(@timeout) end end diff --git a/activerecord/test/cases/associations/eager_test.rb b/activerecord/test/cases/associations/eager_test.rb index 544c01461d..bf01b46852 100644 --- a/activerecord/test/cases/associations/eager_test.rb +++ b/activerecord/test/cases/associations/eager_test.rb @@ -929,10 +929,6 @@ class EagerAssociationTest < ActiveRecord::TestCase end def test_eager_loading_with_conditions_on_joined_table_preloads - # cache metadata in advance to avoid extra sql statements executed while testing - Tagging.first - Tag.first - posts = assert_queries(2) do Post.find(:all, :select => 'distinct posts.*', :include => :author, :joins => [:comments], :conditions => "comments.body like 'Thank you%'", :order => 'posts.id') end @@ -981,9 +977,7 @@ class EagerAssociationTest < ActiveRecord::TestCase end def test_eager_loading_with_conditions_on_join_model_preloads - # cache metadata in advance to avoid extra sql statements executed while testing Author.columns - AuthorAddress.first authors = assert_queries(2) do Author.find(:all, :include => :author_address, :joins => :comments, :conditions => "posts.title like 'Welcome%'") diff --git a/activerecord/test/cases/associations/has_many_associations_test.rb b/activerecord/test/cases/associations/has_many_associations_test.rb index 618daa1b35..30cfcc53d2 100644 --- a/activerecord/test/cases/associations/has_many_associations_test.rb +++ b/activerecord/test/cases/associations/has_many_associations_test.rb @@ -1470,8 +1470,6 @@ class HasManyAssociationsTest < ActiveRecord::TestCase end def test_custom_primary_key_on_new_record_should_fetch_with_query - Essay.first # cache metadata in advance to avoid extra sql statements executed while testing - author = Author.new(:name => "David") assert !author.essays.loaded? diff --git a/activerecord/test/cases/connection_adapters/abstract_adapter_test.rb b/activerecord/test/cases/connection_adapters/abstract_adapter_test.rb index 3dcde699fc..7af9079b48 100644 --- a/activerecord/test/cases/connection_adapters/abstract_adapter_test.rb +++ b/activerecord/test/cases/connection_adapters/abstract_adapter_test.rb @@ -36,7 +36,7 @@ module ActiveRecord def test_close pool = ConnectionPool.new(Base::ConnectionSpecification.new({}, nil)) - pool.insert_connection_for_test! adapter + pool.connections << adapter adapter.pool = pool # Make sure the pool marks the connection in use diff --git a/activerecord/test/cases/connection_pool_test.rb b/activerecord/test/cases/connection_pool_test.rb index 2ada2c1fe1..5cecfa90e7 100644 --- a/activerecord/test/cases/connection_pool_test.rb +++ b/activerecord/test/cases/connection_pool_test.rb @@ -128,110 +128,6 @@ module ActiveRecord end - # The connection pool is "fair" if threads waiting for - # connections receive them the order in which they began - # waiting. This ensures that we don't timeout one HTTP request - # even while well under capacity in a multi-threaded environment - # such as a Java servlet container. - # - # We don't need strict fairness: if two connections become - # available at the same time, it's fine of two threads that were - # waiting acquire the connections out of order. - # - # Thus this test prepares waiting threads and then trickles in - # available connections slowly, ensuring the wakeup order is - # correct in this case. - def test_checkout_fairness - @pool.instance_variable_set(:@size, 10) - expected = (1..@pool.size).to_a.freeze - # check out all connections so our threads start out waiting - conns = expected.map { @pool.checkout } - mutex = Mutex.new - order = [] - errors = [] - - threads = expected.map do |i| - t = Thread.new { - begin - @pool.checkout # connection return value never checked back in - mutex.synchronize { order << i } - rescue => e - mutex.synchronize { errors << e } - end - } - Thread.pass until t.status == "sleep" - t - end - - # this should wake up the waiting threads one by one in order - conns.each { |conn| @pool.checkin(conn); sleep 0.1 } - - threads.each(&:join) - - raise errors.first if errors.any? - - assert_equal(expected, order) - end - - # As mentioned in #test_checkout_fairness, we don't care about - # strict fairness. This test creates two groups of threads: - # group1 whose members all start waiting before any thread in - # group2. Enough connections are checked in to wakeup all - # group1 threads, and the fact that only group1 and no group2 - # threads acquired a connection is enforced. - def test_checkout_fairness_by_group - @pool.instance_variable_set(:@size, 10) - # take all the connections - conns = (1..10).map { @pool.checkout } - mutex = Mutex.new - successes = [] # threads that successfully got a connection - errors = [] - - make_thread = proc do |i| - t = Thread.new { - begin - @pool.checkout # connection return value never checked back in - mutex.synchronize { successes << i } - rescue => e - mutex.synchronize { errors << e } - end - } - Thread.pass until t.status == "sleep" - t - end - - # all group1 threads start waiting before any in group2 - group1 = (1..5).map(&make_thread) - group2 = (6..10).map(&make_thread) - - # checkin n connections back to the pool - checkin = proc do |n| - n.times do - c = conns.pop - @pool.checkin(c) - end - end - - checkin.call(group1.size) # should wake up all group1 - - loop do - sleep 0.1 - break if mutex.synchronize { (successes.size + errors.size) == group1.size } - end - - winners = mutex.synchronize { successes.dup } - checkin.call(group2.size) # should wake up everyone remaining - - group1.each(&:join) - group2.each(&:join) - - assert_equal((1..group1.size).to_a, winners.sort) - - if errors.any? - raise errors.first - end - end - def test_automatic_reconnect= pool = ConnectionPool.new ActiveRecord::Base.connection_pool.spec assert pool.automatic_reconnect diff --git a/activerecord/test/cases/dirty_test.rb b/activerecord/test/cases/dirty_test.rb index e1129704a6..3bd2e909d7 100644 --- a/activerecord/test/cases/dirty_test.rb +++ b/activerecord/test/cases/dirty_test.rb @@ -325,8 +325,6 @@ class DirtyTest < ActiveRecord::TestCase end def test_partial_update_with_optimistic_locking - Person.first # cache metadata in advance to avoid extra sql statements executed while testing - person = Person.new(:first_name => 'foo') old_lock_version = 1 @@ -532,7 +530,7 @@ class DirtyTest < ActiveRecord::TestCase pirate = target.create(:created_on => created_on) pirate.reload # Here mysql truncate the usec value to 0 - + pirate.created_on = created_on assert !pirate.created_on_changed? end |