require 'thread' require 'monitor' require 'set' require 'active_support/core_ext/module/deprecation' require 'timeout' module ActiveRecord # Raised when a connection could not be obtained within the connection # acquisition timeout period. class ConnectionTimeoutError < ConnectionNotEstablished end # Raised when a connection pool is full and another connection is requested class PoolFullError < ConnectionNotEstablished end module ConnectionAdapters # Connection pool base class for managing Active Record database # connections. # # == Introduction # # A connection pool synchronizes thread access to a limited number of # database connections. The basic idea is that each thread checks out a # database connection from the pool, uses that connection, and checks the # connection back in. ConnectionPool is completely thread-safe, and will # ensure that a connection cannot be used by two threads at the same time, # as long as ConnectionPool's contract is correctly followed. It will also # handle cases in which there are more threads than connections: if all # connections have been checked out, and a thread tries to checkout a # connection anyway, then ConnectionPool will wait until some other thread # has checked in a connection. # # == Obtaining (checking out) a connection # # Connections can be obtained and used from a connection pool in several # ways: # # 1. Simply use ActiveRecord::Base.connection as with Active Record 2.1 and # earlier (pre-connection-pooling). Eventually, when you're done with # the connection(s) and wish it to be returned to the pool, you call # ActiveRecord::Base.clear_active_connections!. This will be the # default behavior for Active Record when used in conjunction with # Action Pack's request handling cycle. # 2. Manually check out a connection from the pool with # ActiveRecord::Base.connection_pool.checkout. You are responsible for # returning this connection to the pool when finished by calling # ActiveRecord::Base.connection_pool.checkin(connection). # 3. Use ActiveRecord::Base.connection_pool.with_connection(&block), which # obtains a connection, yields it as the sole argument to the block, # and returns it to the pool after the block completes. # # Connections in the pool are actually AbstractAdapter objects (or objects # compatible with AbstractAdapter's interface). # # == Options # # There are two connection-pooling-related options that you can add to # your database connection configuration: # # * +pool+: number indicating size of connection pool (default 5) # * +wait_timeout+: number of seconds to block and wait for a connection # before giving up and raising a timeout error (default 5 seconds). class ConnectionPool # Every +frequency+ seconds, the reaper will call +reap+ on +pool+. # A reaper instantiated with a nil frequency will never reap the # connection pool. # # Configure the frequency by setting "reaping_frequency" in your # database yaml file. class Reaper attr_reader :pool, :frequency def initialize(pool, frequency) @pool = pool @frequency = frequency end def run return unless frequency Thread.new(frequency, pool) { |t, p| while true sleep t p.reap end } end end include MonitorMixin attr_accessor :automatic_reconnect, :timeout attr_reader :spec, :connections, :size, :reaper class Latch # :nodoc: def initialize @mutex = Mutex.new @cond = ConditionVariable.new end def release @mutex.synchronize { @cond.broadcast } end def await @mutex.synchronize { @cond.wait @mutex } end end # Creates a new ConnectionPool object. +spec+ is a ConnectionSpecification # object which describes database connection information (e.g. adapter, # host name, username, password, etc), as well as the maximum size for # this ConnectionPool. # # The default ConnectionPool maximum size is 5. def initialize(spec) super() @spec = spec # The cache of reserved connections mapped to threads @reserved_connections = {} @timeout = spec.config[:wait_timeout] || 5 @reaper = Reaper.new self, spec.config[:reaping_frequency] @reaper.run # default max pool size to 5 @size = (spec.config[:pool] && spec.config[:pool].to_i) || 5 @latch = Latch.new @connections = [] @automatic_reconnect = true end # Retrieve the connection associated with the current thread, or call # #checkout to obtain one if necessary. # # #connection can be called any number of times; the connection is # held in a hash keyed by the thread id. def connection @reserved_connections[current_connection_id] ||= checkout end # Is there an open connection that is being used for the current thread? def active_connection? @reserved_connections.fetch(current_connection_id) { return false }.in_use? end # Signal that the thread is finished with the current connection. # #release_connection releases the connection-thread association # and returns the connection to the pool. def release_connection(with_id = current_connection_id) synchronize do conn = @reserved_connections.delete(with_id) checkin conn if conn end end # If a connection already exists yield it to the block. If no connection # exists checkout a connection, yield it to the block, and checkin the # connection when finished. def with_connection connection_id = current_connection_id fresh_connection = true unless active_connection? yield connection ensure release_connection(connection_id) if fresh_connection end # Returns true if a connection has already been opened. def connected? synchronize { @connections.any? } end # Disconnects all connections in the pool, and clears the pool. def disconnect! synchronize do @reserved_connections = {} @connections.each do |conn| checkin conn conn.disconnect! end @connections = [] end end # Clears the cache which maps classes. def clear_reloadable_connections! synchronize do @reserved_connections = {} @connections.each do |conn| checkin conn conn.disconnect! if conn.requires_reloading? end @connections.delete_if do |conn| conn.requires_reloading? end end end def clear_stale_cached_connections! # :nodoc: reap end deprecate :clear_stale_cached_connections! => "Please use #reap instead" # Check-out a database connection from the pool, indicating that you want # to use it. You should call #checkin when you no longer need this. # # This is done by either returning and leasing existing connection, or by # creating a new connection and leasing it. # # If all connections are leased and the pool is at capacity (meaning the # number of currently leased connections is greater than or equal to the # size limit set), an ActiveRecord::PoolFullError exception will be raised. # # Returns: an AbstractAdapter object. # # Raises: # - PoolFullError: no connection can be obtained from the pool. def checkout loop do # Checkout an available connection synchronize do # Try to find a connection that hasn't been leased, and lease it conn = connections.find { |c| c.lease } # If all connections were leased, and we have room to expand, # create a new connection and lease it. if !conn && connections.size < size conn = checkout_new_connection conn.lease end return checkout_and_verify(conn) if conn end Timeout.timeout(@timeout, PoolFullError) { @latch.await } end end # Check-in a database connection back into the pool, indicating that you # no longer need this connection. # # +conn+: an AbstractAdapter object, which was obtained by earlier by # calling +checkout+ on this pool. def checkin(conn) synchronize do conn.run_callbacks :checkin do conn.expire end release conn end @latch.release end # Remove a connection from the connection pool. The connection will # remain open and active but will no longer be managed by this pool. def remove(conn) synchronize do @connections.delete conn # FIXME: we might want to store the key on the connection so that removing # from the reserved hash will be a little easier. release conn end @latch.release end # Removes dead connections from the pool. A dead connection can occur # if a programmer forgets to close a connection at the end of a thread # or a thread dies unexpectedly. def reap synchronize do stale = Time.now - @timeout connections.dup.each do |conn| remove conn if conn.in_use? && stale > conn.last_use && !conn.active? end end @latch.release end private def release(conn) thread_id = nil if @reserved_connections[current_connection_id] == conn thread_id = current_connection_id else thread_id = @reserved_connections.keys.find { |k| @reserved_connections[k] == conn } end @reserved_connections.delete thread_id if thread_id end def new_connection ActiveRecord::Base.send(spec.adapter_method, spec.config) end def current_connection_id #:nodoc: ActiveRecord::Base.connection_id ||= Thread.current.object_id end def checkout_new_connection raise ConnectionNotEstablished unless @automatic_reconnect c = new_connection c.pool = self @connections << c c end def checkout_and_verify(c) c.run_callbacks :checkout do c.verify! end c end end # ConnectionHandler is a collection of ConnectionPool objects. It is used # for keeping separate connection pools for Active Record models that connect # to different databases. # # For example, suppose that you have 5 models, with the following hierarchy: # # | # +-- Book # | | # | +-- ScaryBook # | +-- GoodBook # +-- Author # +-- BankAccount # # Suppose that Book is to connect to a separate database (i.e. one other # than the default database). Then Book, ScaryBook and GoodBook will all use # the same connection pool. Likewise, Author and BankAccount will use the # same connection pool. However, the connection pool used by Author/BankAccount # is not the same as the one used by Book/ScaryBook/GoodBook. # # Normally there is only a single ConnectionHandler instance, accessible via # ActiveRecord::Base.connection_handler. Active Record models use this to # determine that connection pool that they should use. class ConnectionHandler def initialize(pools = Hash.new { |h,k| h[k] = {} }) @connection_pools = pools @class_to_pool = Hash.new { |h,k| h[k] = {} } end def connection_pools @connection_pools[Process.pid] end def establish_connection(name, spec) set_pool_for_spec spec, ConnectionAdapters::ConnectionPool.new(spec) set_class_to_pool name, connection_pools[spec] end # Returns true if there are any active connections among the connection # pools that the ConnectionHandler is managing. def active_connections? connection_pools.values.any? { |pool| pool.active_connection? } end # Returns any connections in use by the current thread back to the pool, # and also returns connections to the pool cached by threads that are no # longer alive. def clear_active_connections! connection_pools.each_value {|pool| pool.release_connection } end # Clears the cache which maps classes. def clear_reloadable_connections! connection_pools.each_value {|pool| pool.clear_reloadable_connections! } end def clear_all_connections! connection_pools.each_value {|pool| pool.disconnect! } end # Locate the connection of the nearest super class. This can be an # active or defined connection: if it is the latter, it will be # opened and set as the active connection for the class it was defined # for (not necessarily the current class). def retrieve_connection(klass) #:nodoc: pool = retrieve_connection_pool(klass) (pool && pool.connection) or raise ConnectionNotEstablished end # Returns true if a connection that's accessible to this class has # already been opened. def connected?(klass) conn = retrieve_connection_pool(klass) conn && conn.connected? end # Remove the connection for this class. This will close the active # connection and the defined connection (if they exist). The result # can be used as an argument for establish_connection, for easily # re-establishing the connection. def remove_connection(klass) pool = class_to_pool.delete(klass.name) return nil unless pool connection_pools.delete pool.spec pool.automatic_reconnect = false pool.disconnect! pool.spec.config end def retrieve_connection_pool(klass) pool = get_pool_for_class klass.name return pool if pool return nil if ActiveRecord::Model == klass retrieve_connection_pool klass.active_record_super end private def class_to_pool @class_to_pool[Process.pid] end def set_pool_for_spec(spec, pool) @connection_pools[Process.pid][spec] = pool end def set_class_to_pool(name, pool) @class_to_pool[Process.pid][name] = pool pool end def get_pool_for_class(klass) @class_to_pool[Process.pid].fetch(klass) { c_to_p = @class_to_pool.values.find { |class_to_pool| class_to_pool[klass] } if c_to_p pool = c_to_p[klass] pool = ConnectionAdapters::ConnectionPool.new pool.spec set_pool_for_spec pool.spec, pool set_class_to_pool klass, pool else set_class_to_pool klass, nil end } end end class ConnectionManagement def initialize(app) @app = app end def call(env) testing = env.key?('rack.test') response = @app.call(env) response[2] = ::Rack::BodyProxy.new(response[2]) do ActiveRecord::Base.clear_active_connections! unless testing end response rescue ActiveRecord::Base.clear_active_connections! unless testing raise end end end end