diff options
Diffstat (limited to 'activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb')
-rw-r--r-- | activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb | 375 |
1 files changed, 187 insertions, 188 deletions
diff --git a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb index dc5b305843..d0c5bbe17d 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb @@ -1,6 +1,6 @@ -require 'thread' -require 'concurrent/map' -require 'monitor' +require "thread" +require "concurrent/map" +require "monitor" module ActiveRecord # Raised when a connection could not be obtained within the connection @@ -150,61 +150,61 @@ module ActiveRecord private - def internal_poll(timeout) - no_wait_poll || (timeout && wait_poll(timeout)) - end + def internal_poll(timeout) + no_wait_poll || (timeout && wait_poll(timeout)) + end - def synchronize(&block) - @lock.synchronize(&block) - end + def synchronize(&block) + @lock.synchronize(&block) + end # Test if the queue currently contains any elements. - def any? - !@queue.empty? - end + def any? + !@queue.empty? + end # A thread can remove an element from the queue without # waiting if and only if the number of currently available # connections is strictly greater than the number of waiting # threads. - def can_remove_no_wait? - @queue.size > @num_waiting - end + def can_remove_no_wait? + @queue.size > @num_waiting + end # Removes and returns the head of the queue if possible, or nil. - def remove - @queue.shift - end + def remove + @queue.shift + end # Remove and return the head the queue if the number of # available elements is strictly greater than the number of # threads currently waiting. Otherwise, return nil. - def no_wait_poll - remove if can_remove_no_wait? - end + def no_wait_poll + remove if can_remove_no_wait? + end # Waits on the queue up to +timeout+ seconds, then removes and # returns the head of the queue. - def wait_poll(timeout) - @num_waiting += 1 + def wait_poll(timeout) + @num_waiting += 1 - t0 = Time.now - elapsed = 0 - loop do - @cond.wait(timeout - elapsed) + t0 = Time.now + elapsed = 0 + loop do + @cond.wait(timeout - elapsed) - return remove if any? + return remove if any? - elapsed = Time.now - t0 - if elapsed >= timeout - msg = 'could not obtain a connection from the pool within %0.3f seconds (waited %0.3f seconds); all pooled connections were in use' % - [timeout, elapsed] - raise ConnectionTimeoutError, msg + elapsed = Time.now - t0 + if elapsed >= timeout + msg = "could not obtain a connection from the pool within %0.3f seconds (waited %0.3f seconds); all pooled connections were in use" % + [timeout, elapsed] + raise ConnectionTimeoutError, msg + end end + ensure + @num_waiting -= 1 end - ensure - @num_waiting -= 1 - end end # Adds the ability to turn a basic fair FIFO queue into one @@ -274,11 +274,11 @@ module ActiveRecord include BiasableQueue private - def internal_poll(timeout) - conn = super - conn.lease if conn - conn - end + def internal_poll(timeout) + conn = super + conn.lease if conn + conn + end end # Every +frequency+ seconds, the reaper will call +reap+ on +pool+. @@ -339,7 +339,7 @@ module ActiveRecord # that case +conn.owner+ attr should be consulted. # Access and modification of +@thread_cached_conns+ does not require # synchronization. - @thread_cached_conns = Concurrent::Map.new(:initial_capacity => @size) + @thread_cached_conns = Concurrent::Map.new(initial_capacity: @size) @connections = [] @automatic_reconnect = true @@ -584,111 +584,111 @@ module ActiveRecord private #-- # this is unfortunately not concurrent - def bulk_make_new_connections(num_new_conns_needed) - num_new_conns_needed.times do - # try_to_checkout_new_connection will not exceed pool's @size limit - if new_conn = try_to_checkout_new_connection - # make the new_conn available to the starving threads stuck @available Queue - checkin(new_conn) + def bulk_make_new_connections(num_new_conns_needed) + num_new_conns_needed.times do + # try_to_checkout_new_connection will not exceed pool's @size limit + if new_conn = try_to_checkout_new_connection + # make the new_conn available to the starving threads stuck @available Queue + checkin(new_conn) + end end end - end #-- # From the discussion on GitHub: # https://github.com/rails/rails/pull/14938#commitcomment-6601951 # This hook-in method allows for easier monkey-patching fixes needed by # JRuby users that use Fibers. - def connection_cache_key(thread) - thread - end + def connection_cache_key(thread) + thread + end # Take control of all existing connections so a "group" action such as # reload/disconnect can be performed safely. It is no longer enough to # wrap it in +synchronize+ because some pool's actions are allowed # to be performed outside of the main +synchronize+ block. - def with_exclusively_acquired_all_connections(raise_on_acquisition_timeout = true) - with_new_connections_blocked do - attempt_to_checkout_all_existing_connections(raise_on_acquisition_timeout) - yield + def with_exclusively_acquired_all_connections(raise_on_acquisition_timeout = true) + with_new_connections_blocked do + attempt_to_checkout_all_existing_connections(raise_on_acquisition_timeout) + yield + end end - end - def attempt_to_checkout_all_existing_connections(raise_on_acquisition_timeout = true) - collected_conns = synchronize do - # account for our own connections - @connections.select {|conn| conn.owner == Thread.current} - end + def attempt_to_checkout_all_existing_connections(raise_on_acquisition_timeout = true) + collected_conns = synchronize do + # account for our own connections + @connections.select { |conn| conn.owner == Thread.current } + end - newly_checked_out = [] - timeout_time = Time.now + (@checkout_timeout * 2) + newly_checked_out = [] + timeout_time = Time.now + (@checkout_timeout * 2) - @available.with_a_bias_for(Thread.current) do - loop do - synchronize do - return if collected_conns.size == @connections.size && @now_connecting == 0 - remaining_timeout = timeout_time - Time.now - remaining_timeout = 0 if remaining_timeout < 0 - conn = checkout_for_exclusive_access(remaining_timeout) - collected_conns << conn - newly_checked_out << conn + @available.with_a_bias_for(Thread.current) do + loop do + synchronize do + return if collected_conns.size == @connections.size && @now_connecting == 0 + remaining_timeout = timeout_time - Time.now + remaining_timeout = 0 if remaining_timeout < 0 + conn = checkout_for_exclusive_access(remaining_timeout) + collected_conns << conn + newly_checked_out << conn + end end end - end - rescue ExclusiveConnectionTimeoutError - # <tt>raise_on_acquisition_timeout == false</tt> means we are directed to ignore any - # timeouts and are expected to just give up: we've obtained as many connections - # as possible, note that in a case like that we don't return any of the - # +newly_checked_out+ connections. - - if raise_on_acquisition_timeout + rescue ExclusiveConnectionTimeoutError + # <tt>raise_on_acquisition_timeout == false</tt> means we are directed to ignore any + # timeouts and are expected to just give up: we've obtained as many connections + # as possible, note that in a case like that we don't return any of the + # +newly_checked_out+ connections. + + if raise_on_acquisition_timeout + release_newly_checked_out = true + raise + end + rescue Exception # if something else went wrong + # this can't be a "naked" rescue, because we have should return conns + # even for non-StandardErrors release_newly_checked_out = true raise + ensure + if release_newly_checked_out && newly_checked_out + # releasing only those conns that were checked out in this method, conns + # checked outside this method (before it was called) are not for us to release + newly_checked_out.each { |conn| checkin(conn) } + end end - rescue Exception # if something else went wrong - # this can't be a "naked" rescue, because we have should return conns - # even for non-StandardErrors - release_newly_checked_out = true - raise - ensure - if release_newly_checked_out && newly_checked_out - # releasing only those conns that were checked out in this method, conns - # checked outside this method (before it was called) are not for us to release - newly_checked_out.each {|conn| checkin(conn)} - end - end #-- # Must be called in a synchronize block. - def checkout_for_exclusive_access(checkout_timeout) - checkout(checkout_timeout) - rescue ConnectionTimeoutError - # this block can't be easily moved into attempt_to_checkout_all_existing_connections's - # rescue block, because doing so would put it outside of synchronize section, without - # being in a critical section thread_report might become inaccurate - msg = "could not obtain ownership of all database connections in #{checkout_timeout} seconds" - - thread_report = [] - @connections.each do |conn| - unless conn.owner == Thread.current - thread_report << "#{conn} is owned by #{conn.owner}" + def checkout_for_exclusive_access(checkout_timeout) + checkout(checkout_timeout) + rescue ConnectionTimeoutError + # this block can't be easily moved into attempt_to_checkout_all_existing_connections's + # rescue block, because doing so would put it outside of synchronize section, without + # being in a critical section thread_report might become inaccurate + msg = "could not obtain ownership of all database connections in #{checkout_timeout} seconds" + + thread_report = [] + @connections.each do |conn| + unless conn.owner == Thread.current + thread_report << "#{conn} is owned by #{conn.owner}" + end end - end - msg << " (#{thread_report.join(', ')})" if thread_report.any? + msg << " (#{thread_report.join(', ')})" if thread_report.any? - raise ExclusiveConnectionTimeoutError, msg - end + raise ExclusiveConnectionTimeoutError, msg + end - def with_new_connections_blocked - previous_value = nil - synchronize do - previous_value, @new_cons_enabled = @new_cons_enabled, false + def with_new_connections_blocked + previous_value = nil + synchronize do + previous_value, @new_cons_enabled = @new_cons_enabled, false + end + yield + ensure + synchronize { @new_cons_enabled = previous_value } end - yield - ensure - synchronize { @new_cons_enabled = previous_value } - end # Acquire a connection by one of 1) immediately removing one # from the queue of available connections, 2) creating a new @@ -701,86 +701,86 @@ module ActiveRecord #-- # Implementation detail: the connection returned by +acquire_connection+ # will already be "+connection.lease+ -ed" to the current thread. - def acquire_connection(checkout_timeout) - # NOTE: we rely on +@available.poll+ and +try_to_checkout_new_connection+ to - # +conn.lease+ the returned connection (and to do this in a +synchronized+ - # section). This is not the cleanest implementation, as ideally we would - # <tt>synchronize { conn.lease }</tt> in this method, but by leaving it to +@available.poll+ - # and +try_to_checkout_new_connection+ we can piggyback on +synchronize+ sections - # of the said methods and avoid an additional +synchronize+ overhead. - if conn = @available.poll || try_to_checkout_new_connection - conn - else - reap - @available.poll(checkout_timeout) + def acquire_connection(checkout_timeout) + # NOTE: we rely on +@available.poll+ and +try_to_checkout_new_connection+ to + # +conn.lease+ the returned connection (and to do this in a +synchronized+ + # section). This is not the cleanest implementation, as ideally we would + # <tt>synchronize { conn.lease }</tt> in this method, but by leaving it to +@available.poll+ + # and +try_to_checkout_new_connection+ we can piggyback on +synchronize+ sections + # of the said methods and avoid an additional +synchronize+ overhead. + if conn = @available.poll || try_to_checkout_new_connection + conn + else + reap + @available.poll(checkout_timeout) + end end - end #-- # if owner_thread param is omitted, this must be called in synchronize block - def remove_connection_from_thread_cache(conn, owner_thread = conn.owner) - @thread_cached_conns.delete_pair(connection_cache_key(owner_thread), conn) - end - alias_method :release, :remove_connection_from_thread_cache + def remove_connection_from_thread_cache(conn, owner_thread = conn.owner) + @thread_cached_conns.delete_pair(connection_cache_key(owner_thread), conn) + end + alias_method :release, :remove_connection_from_thread_cache - def new_connection - Base.send(spec.adapter_method, spec.config).tap do |conn| - conn.schema_cache = schema_cache.dup if schema_cache + def new_connection + Base.send(spec.adapter_method, spec.config).tap do |conn| + conn.schema_cache = schema_cache.dup if schema_cache + end end - end # If the pool is not at a +@size+ limit, establish new connection. Connecting # to the DB is done outside main synchronized section. #-- # Implementation constraint: a newly established connection returned by this # method must be in the +.leased+ state. - def try_to_checkout_new_connection - # first in synchronized section check if establishing new conns is allowed - # and increment @now_connecting, to prevent overstepping this pool's @size - # constraint - do_checkout = synchronize do - if @new_cons_enabled && (@connections.size + @now_connecting) < @size - @now_connecting += 1 - end - end - if do_checkout - begin - # if successfully incremented @now_connecting establish new connection - # outside of synchronized section - conn = checkout_new_connection - ensure - synchronize do - if conn - adopt_connection(conn) - # returned conn needs to be already leased - conn.lease + def try_to_checkout_new_connection + # first in synchronized section check if establishing new conns is allowed + # and increment @now_connecting, to prevent overstepping this pool's @size + # constraint + do_checkout = synchronize do + if @new_cons_enabled && (@connections.size + @now_connecting) < @size + @now_connecting += 1 + end + end + if do_checkout + begin + # if successfully incremented @now_connecting establish new connection + # outside of synchronized section + conn = checkout_new_connection + ensure + synchronize do + if conn + adopt_connection(conn) + # returned conn needs to be already leased + conn.lease + end + @now_connecting -= 1 end - @now_connecting -= 1 end end end - end - def adopt_connection(conn) - conn.pool = self - @connections << conn - end + def adopt_connection(conn) + conn.pool = self + @connections << conn + end - def checkout_new_connection - raise ConnectionNotEstablished unless @automatic_reconnect - new_connection - end + def checkout_new_connection + raise ConnectionNotEstablished unless @automatic_reconnect + new_connection + end - def checkout_and_verify(c) - c._run_checkout_callbacks do - c.verify! + def checkout_and_verify(c) + c._run_checkout_callbacks do + c.verify! + end + c + rescue + remove c + c.disconnect! + raise end - c - rescue - remove c - c.disconnect! - raise - end end # ConnectionHandler is a collection of ConnectionPool objects. It is used @@ -833,8 +833,8 @@ module ActiveRecord class ConnectionHandler def initialize # These caches are keyed by spec.name (ConnectionSpecification#name). - @owner_to_pool = Concurrent::Map.new(:initial_capacity => 2) do |h,k| - h[k] = Concurrent::Map.new(:initial_capacity => 2) + @owner_to_pool = Concurrent::Map.new(initial_capacity: 2) do |h,k| + h[k] = Concurrent::Map.new(initial_capacity: 2) end end @@ -848,7 +848,6 @@ module ActiveRecord spec = resolver.spec(config) remove_connection(spec.name) - owner_to_pool[spec.name] = ConnectionAdapters::ConnectionPool.new(spec) message_bus = ActiveSupport::Notifications.instrumenter payload = { @@ -859,7 +858,7 @@ module ActiveRecord payload[:config] = spec.config end - message_bus.instrument('!connection.active_record', payload) do + message_bus.instrument("!connection.active_record", payload) do owner_to_pool[spec.name] = ConnectionAdapters::ConnectionPool.new(spec) end @@ -943,14 +942,14 @@ module ActiveRecord private - def owner_to_pool - @owner_to_pool[Process.pid] - end + def owner_to_pool + @owner_to_pool[Process.pid] + end - def pool_from_any_process_for(spec_name) - owner_to_pool = @owner_to_pool.values.find { |v| v[spec_name] } - owner_to_pool && owner_to_pool[spec_name] - end + def pool_from_any_process_for(spec_name) + owner_to_pool = @owner_to_pool.values.find { |v| v[spec_name] } + owner_to_pool && owner_to_pool[spec_name] + end end end end |