aboutsummaryrefslogtreecommitdiffstats
path: root/activerecord
diff options
context:
space:
mode:
Diffstat (limited to 'activerecord')
-rw-r--r--activerecord/CHANGELOG.md5
-rw-r--r--activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb239
-rw-r--r--activerecord/test/cases/associations/eager_test.rb6
-rw-r--r--activerecord/test/cases/associations/has_many_associations_test.rb2
-rw-r--r--activerecord/test/cases/connection_adapters/abstract_adapter_test.rb2
-rw-r--r--activerecord/test/cases/connection_pool_test.rb104
-rw-r--r--activerecord/test/cases/dirty_test.rb4
7 files changed, 51 insertions, 311 deletions
diff --git a/activerecord/CHANGELOG.md b/activerecord/CHANGELOG.md
index 5133438e32..b3c1b034d7 100644
--- a/activerecord/CHANGELOG.md
+++ b/activerecord/CHANGELOG.md
@@ -1,10 +1,5 @@
## Rails 3.2.9 (unreleased)
-* Make ActiveRecord::ConnectionPool 'fair', first thread waiting is
- first thread given newly available connection. Backport of #6492 02b2335563
-
- *jrochkind*
-
* Fix creation of through association models when using `collection=[]`
on a `has_many :through` association from an unsaved model.
Fix #7661.
diff --git a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb
index 235efc12f6..d4649102df 100644
--- a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb
+++ b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb
@@ -57,141 +57,10 @@ module ActiveRecord
# * +wait_timeout+: number of seconds to block and wait for a connection
# before giving up and raising a timeout error (default 5 seconds).
class ConnectionPool
-
- # Threadsafe, fair, FIFO queue. Meant to be used by ConnectionPool
- # with which it shares a Monitor. But could be a generic Queue.
- #
- # The Queue in stdlib's 'thread' could replace this class except
- # stdlib's doesn't support waiting with a timeout.
- class Queue
- def initialize(lock = Monitor.new)
- @lock = lock
- @cond = @lock.new_cond
- @num_waiting = 0
- @queue = []
- end
-
- # Test if any threads are currently waiting on the queue.
- def any_waiting?
- synchronize do
- @num_waiting > 0
- end
- end
-
- # Return the number of threads currently waiting on this
- # queue.
- def num_waiting
- synchronize do
- @num_waiting
- end
- end
-
- # Add +element+ to the queue. Never blocks.
- def add(element)
- synchronize do
- @queue.push element
- @cond.signal
- end
- end
-
- # If +element+ is in the queue, remove and return it, or nil.
- def delete(element)
- synchronize do
- @queue.delete(element)
- end
- end
-
- # Remove all elements from the queue.
- def clear
- synchronize do
- @queue.clear
- end
- end
-
- # Remove the head of the queue.
- #
- # If +timeout+ is not given, remove and return the head the
- # queue if the number of available elements is strictly
- # greater than the number of threads currently waiting (that
- # is, don't jump ahead in line). Otherwise, return nil.
- #
- # If +timeout+ is given, block if it there is no element
- # available, waiting up to +timeout+ seconds for an element to
- # become available.
- #
- # Raises:
- # - ConnectionTimeoutError if +timeout+ is given and no element
- # becomes available after +timeout+ seconds,
- def poll(timeout = nil)
- synchronize do
- if timeout
- no_wait_poll || wait_poll(timeout)
- else
- no_wait_poll
- end
- end
- end
-
- private
-
- def synchronize(&block)
- @lock.synchronize(&block)
- end
-
- # Test if the queue currently contains any elements.
- def any?
- !@queue.empty?
- end
-
- # A thread can remove an element from the queue without
- # waiting if an only if the number of currently available
- # connections is strictly greater than the number of waiting
- # threads.
- def can_remove_no_wait?
- @queue.size > @num_waiting
- end
-
- # Removes and returns the head of the queue if possible, or nil.
- def remove
- @queue.shift
- end
-
- # Remove and return the head the queue if the number of
- # available elements is strictly greater than the number of
- # threads currently waiting. Otherwise, return nil.
- def no_wait_poll
- remove if can_remove_no_wait?
- end
-
- # Waits on the queue up to +timeout+ seconds, then removes and
- # returns the head of the queue.
- def wait_poll(timeout)
- @num_waiting += 1
-
- t0 = Time.now
- elapsed = 0
- loop do
- @cond.wait(timeout - elapsed)
-
- return remove if any?
-
- elapsed = Time.now - t0
-
- if elapsed >= timeout
- msg = 'could not obtain a database connection within %0.3f seconds (waited %0.3f seconds)' %
- [timeout, elapsed]
- raise ConnectionTimeoutError, msg
- end
- end
- ensure
- @num_waiting -= 1
- end
- end
-
include MonitorMixin
attr_accessor :automatic_reconnect
- attr_reader :spec, :connections, :size
+ attr_reader :spec, :connections
# Creates a new ConnectionPool object. +spec+ is a ConnectionSpecification
# object which describes database connection information (e.g. adapter,
@@ -207,6 +76,7 @@ module ActiveRecord
# The cache of reserved connections mapped to threads
@reserved_connections = {}
+ @queue = new_cond
@timeout = spec.config[:wait_timeout] || 5
# default max pool size to 5
@@ -214,16 +84,6 @@ module ActiveRecord
@connections = []
@automatic_reconnect = true
-
- @available = Queue.new self
- end
-
- # Hack for tests to be able to add connections. Do not call outside of tests
- def insert_connection_for_test!(c) #:nodoc:
- synchronize do
- @connections << c
- @available.add c
- end
end
# Retrieve the connection associated with the current thread, or call
@@ -279,7 +139,6 @@ module ActiveRecord
conn.disconnect!
end
@connections = []
- @available.clear
end
end
@@ -291,15 +150,9 @@ module ActiveRecord
checkin conn
conn.disconnect! if conn.requires_reloading?
end
-
@connections.delete_if do |conn|
conn.requires_reloading?
end
- @available.clear
- @connections.each do |conn|
- @available.add conn
- end
-
end
end
@@ -363,22 +216,58 @@ connection. For example: ActiveRecord::Base.connection.close
# Check-out a database connection from the pool, indicating that you want
# to use it. You should call #checkin when you no longer need this.
#
- # This is done by either returning and leasing existing connection, or by
- # creating a new connection and leasing it.
- #
- # If all connections are leased and the pool is at capacity (meaning the
- # number of currently leased connections is greater than or equal to the
- # size limit set), an ActiveRecord::PoolFullError exception will be raised.
+ # This is done by either returning an existing connection, or by creating
+ # a new connection. If the maximum number of connections for this pool has
+ # already been reached, but the pool is empty (i.e. they're all being used),
+ # then this method will wait until a thread has checked in a connection.
+ # The wait time is bounded however: if no connection can be checked out
+ # within the timeout specified for this pool, then a ConnectionTimeoutError
+ # exception will be raised.
#
# Returns: an AbstractAdapter object.
#
# Raises:
- # - PoolFullError: no connection can be obtained from the pool.
+ # - ConnectionTimeoutError: no connection can be obtained from the pool
+ # within the timeout period.
def checkout
synchronize do
- conn = acquire_connection
- conn.lease
- checkout_and_verify(conn)
+ waited_time = 0
+
+ loop do
+ conn = @connections.find { |c| c.lease }
+
+ unless conn
+ if @connections.size < @size
+ conn = checkout_new_connection
+ conn.lease
+ end
+ end
+
+ if conn
+ checkout_and_verify conn
+ return conn
+ end
+
+ if waited_time >= @timeout
+ raise ConnectionTimeoutError, "could not obtain a database connection#{" within #{@timeout} seconds" if @timeout} (waited #{waited_time} seconds). The max pool size is currently #{@size}; consider increasing it."
+ end
+
+ # Sometimes our wait can end because a connection is available,
+ # but another thread can snatch it up first. If timeout hasn't
+ # passed but no connection is avail, looks like that happened --
+ # loop and wait again, for the time remaining on our timeout.
+ before_wait = Time.now
+ @queue.wait( [@timeout - waited_time, 0].max )
+ waited_time += (Time.now - before_wait)
+
+ # Will go away in Rails 4, when we don't clean up
+ # after leaked connections automatically anymore. Right now, clean
+ # up after we've returned from a 'wait' if it looks like it's
+ # needed, then loop and try again.
+ if(active_connections.size >= @connections.size)
+ clear_stale_cached_connections!
+ end
+ end
end
end
@@ -391,40 +280,10 @@ connection. For example: ActiveRecord::Base.connection.close
synchronize do
conn.run_callbacks :checkin do
conn.expire
+ @queue.signal
end
release conn
-
- @available.add conn
- end
- end
-
- # Acquire a connection by one of 1) immediately removing one
- # from the queue of available connections, 2) creating a new
- # connection if the pool is not at capacity, 3) waiting on the
- # queue for a connection to become available (first calling
- # clear_stale_cached_connections! to clean up leaked connections,
- # this cleanup will prob be going away in Rails4).
- #
- # Raises:
- # - ConnectionTimeoutError if a connection could not be acquired
- def acquire_connection
- if conn = @available.poll
- conn
- elsif @connections.size < @size
- checkout_new_connection
- else
- # this conditional clear_stale will go away in Rails 4, when we don't
- # clean up after leaked connections automatically anymore. Right now,
- # clean up after we've returned from a 'wait' if it looks like it's
- # needed before trying to wait for a connection.
- synchronize do
- if(active_connections.size >= @connections.size)
- clear_stale_cached_connections!
- end
- end
-
- @available.poll(@timeout)
end
end
diff --git a/activerecord/test/cases/associations/eager_test.rb b/activerecord/test/cases/associations/eager_test.rb
index 544c01461d..bf01b46852 100644
--- a/activerecord/test/cases/associations/eager_test.rb
+++ b/activerecord/test/cases/associations/eager_test.rb
@@ -929,10 +929,6 @@ class EagerAssociationTest < ActiveRecord::TestCase
end
def test_eager_loading_with_conditions_on_joined_table_preloads
- # cache metadata in advance to avoid extra sql statements executed while testing
- Tagging.first
- Tag.first
-
posts = assert_queries(2) do
Post.find(:all, :select => 'distinct posts.*', :include => :author, :joins => [:comments], :conditions => "comments.body like 'Thank you%'", :order => 'posts.id')
end
@@ -981,9 +977,7 @@ class EagerAssociationTest < ActiveRecord::TestCase
end
def test_eager_loading_with_conditions_on_join_model_preloads
- # cache metadata in advance to avoid extra sql statements executed while testing
Author.columns
- AuthorAddress.first
authors = assert_queries(2) do
Author.find(:all, :include => :author_address, :joins => :comments, :conditions => "posts.title like 'Welcome%'")
diff --git a/activerecord/test/cases/associations/has_many_associations_test.rb b/activerecord/test/cases/associations/has_many_associations_test.rb
index 618daa1b35..30cfcc53d2 100644
--- a/activerecord/test/cases/associations/has_many_associations_test.rb
+++ b/activerecord/test/cases/associations/has_many_associations_test.rb
@@ -1470,8 +1470,6 @@ class HasManyAssociationsTest < ActiveRecord::TestCase
end
def test_custom_primary_key_on_new_record_should_fetch_with_query
- Essay.first # cache metadata in advance to avoid extra sql statements executed while testing
-
author = Author.new(:name => "David")
assert !author.essays.loaded?
diff --git a/activerecord/test/cases/connection_adapters/abstract_adapter_test.rb b/activerecord/test/cases/connection_adapters/abstract_adapter_test.rb
index 3dcde699fc..7af9079b48 100644
--- a/activerecord/test/cases/connection_adapters/abstract_adapter_test.rb
+++ b/activerecord/test/cases/connection_adapters/abstract_adapter_test.rb
@@ -36,7 +36,7 @@ module ActiveRecord
def test_close
pool = ConnectionPool.new(Base::ConnectionSpecification.new({}, nil))
- pool.insert_connection_for_test! adapter
+ pool.connections << adapter
adapter.pool = pool
# Make sure the pool marks the connection in use
diff --git a/activerecord/test/cases/connection_pool_test.rb b/activerecord/test/cases/connection_pool_test.rb
index 2ada2c1fe1..5cecfa90e7 100644
--- a/activerecord/test/cases/connection_pool_test.rb
+++ b/activerecord/test/cases/connection_pool_test.rb
@@ -128,110 +128,6 @@ module ActiveRecord
end
- # The connection pool is "fair" if threads waiting for
- # connections receive them the order in which they began
- # waiting. This ensures that we don't timeout one HTTP request
- # even while well under capacity in a multi-threaded environment
- # such as a Java servlet container.
- #
- # We don't need strict fairness: if two connections become
- # available at the same time, it's fine of two threads that were
- # waiting acquire the connections out of order.
- #
- # Thus this test prepares waiting threads and then trickles in
- # available connections slowly, ensuring the wakeup order is
- # correct in this case.
- def test_checkout_fairness
- @pool.instance_variable_set(:@size, 10)
- expected = (1..@pool.size).to_a.freeze
- # check out all connections so our threads start out waiting
- conns = expected.map { @pool.checkout }
- mutex = Mutex.new
- order = []
- errors = []
-
- threads = expected.map do |i|
- t = Thread.new {
- begin
- @pool.checkout # connection return value never checked back in
- mutex.synchronize { order << i }
- rescue => e
- mutex.synchronize { errors << e }
- end
- }
- Thread.pass until t.status == "sleep"
- t
- end
-
- # this should wake up the waiting threads one by one in order
- conns.each { |conn| @pool.checkin(conn); sleep 0.1 }
-
- threads.each(&:join)
-
- raise errors.first if errors.any?
-
- assert_equal(expected, order)
- end
-
- # As mentioned in #test_checkout_fairness, we don't care about
- # strict fairness. This test creates two groups of threads:
- # group1 whose members all start waiting before any thread in
- # group2. Enough connections are checked in to wakeup all
- # group1 threads, and the fact that only group1 and no group2
- # threads acquired a connection is enforced.
- def test_checkout_fairness_by_group
- @pool.instance_variable_set(:@size, 10)
- # take all the connections
- conns = (1..10).map { @pool.checkout }
- mutex = Mutex.new
- successes = [] # threads that successfully got a connection
- errors = []
-
- make_thread = proc do |i|
- t = Thread.new {
- begin
- @pool.checkout # connection return value never checked back in
- mutex.synchronize { successes << i }
- rescue => e
- mutex.synchronize { errors << e }
- end
- }
- Thread.pass until t.status == "sleep"
- t
- end
-
- # all group1 threads start waiting before any in group2
- group1 = (1..5).map(&make_thread)
- group2 = (6..10).map(&make_thread)
-
- # checkin n connections back to the pool
- checkin = proc do |n|
- n.times do
- c = conns.pop
- @pool.checkin(c)
- end
- end
-
- checkin.call(group1.size) # should wake up all group1
-
- loop do
- sleep 0.1
- break if mutex.synchronize { (successes.size + errors.size) == group1.size }
- end
-
- winners = mutex.synchronize { successes.dup }
- checkin.call(group2.size) # should wake up everyone remaining
-
- group1.each(&:join)
- group2.each(&:join)
-
- assert_equal((1..group1.size).to_a, winners.sort)
-
- if errors.any?
- raise errors.first
- end
- end
-
def test_automatic_reconnect=
pool = ConnectionPool.new ActiveRecord::Base.connection_pool.spec
assert pool.automatic_reconnect
diff --git a/activerecord/test/cases/dirty_test.rb b/activerecord/test/cases/dirty_test.rb
index e1129704a6..3bd2e909d7 100644
--- a/activerecord/test/cases/dirty_test.rb
+++ b/activerecord/test/cases/dirty_test.rb
@@ -325,8 +325,6 @@ class DirtyTest < ActiveRecord::TestCase
end
def test_partial_update_with_optimistic_locking
- Person.first # cache metadata in advance to avoid extra sql statements executed while testing
-
person = Person.new(:first_name => 'foo')
old_lock_version = 1
@@ -532,7 +530,7 @@ class DirtyTest < ActiveRecord::TestCase
pirate = target.create(:created_on => created_on)
pirate.reload # Here mysql truncate the usec value to 0
-
+
pirate.created_on = created_on
assert !pirate.created_on_changed?
end