aboutsummaryrefslogblamecommitdiffstats
path: root/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb
blob: 988f85e909599ab4686e39bc12869dbcae19d1d6 (plain) (tree)
1
2
3
4
5
6
7
8
9
                 

             
                   




                                                                        
                           





                                                                            





                                                                              






                                                                              
                        
                                            





                                                                        
                           
                                                            


                                                                               
            
                                             
           
         
 

                                                                   
 
                          
                    

                                                             

                                                   

         





                                                                           



                                                               
                                                                  
           

         
                                                                       
                                                                      
                                               
                            
                                                                   









                                                                               

         


                                                             

         


                                                 
                      

                                  

                          
                                  

         
                                           
                                       
                                                  
                      


                                  

                                     
                                  



             

                                                                       
                                             
                                                                           
                      
           
                                        
                                                  


           



                                                                   
 


                                                                  

         
                                          
                                                                            
         
                                
 
                              
                                                                      
         

                          
                                                   



                                                                    




                                                                                                      


                                         
 










                                                        

           

       


                                                               


                                                     
 
                                                        
 
                  

                      
 
                       

                        
 
             






                                                         
       
 














                                                                              












                                                  
                                                  
                                        

                                         















                                                                                                      
                                  












                                      
                                               












                                 



                                   
 

                                


                                          
                                                                                 

         

                                                      









                                                           
                                                                      







                                                                                 
                                                               



                                             
                                                                              










                                                                            

                                                                        














                                                                         





                                                   
           
         
       
























                                                                                     

     
require 'monitor'
require 'set'

module ActiveRecord
  # Raised when a connection could not be obtained within the connection
  # acquisition timeout period.
  class ConnectionTimeoutError < ConnectionNotEstablished
  end

  module ConnectionAdapters
    # Connection pool base class for managing ActiveRecord database
    # connections.
    #
    # Connections can be obtained and used from a connection pool in several
    # ways:
    #
    # 1. Simply use ActiveRecord::Base.connection as with ActiveRecord 2.1 and
    #    earlier (pre-connection-pooling). Eventually, when you're done with
    #    the connection(s) and wish it to be returned to the pool, you call
    #    ActiveRecord::Base.clear_active_connections!. This will be the
    #    default behavior for ActiveRecord when used in conjunction with
    #    ActionPack's request handling cycle.
    # 2. Manually check out a connection from the pool with
    #    ActiveRecord::Base.connection_pool.checkout. You are responsible for
    #    returning this connection to the pool when finished by calling
    #    ActiveRecord::Base.connection_pool.checkin(connection).
    # 3. Use ActiveRecord::Base.connection_pool.with_connection(&block), which
    #    obtains a connection, yields it as the sole argument to the block,
    #    and returns it to the pool after the block completes.
    class ConnectionPool
      # Factory method for connection pools.
      # Determines pool type to use based on contents of connection
      # specification. Additional options for connection specification:
      #
      # * +pool+: number indicating size of fixed connection pool to use
      # * +wait_timeout+ (optional): number of seconds to block and wait
      #   for a connection before giving up and raising a timeout error.
      def self.create(spec)
        if spec.config[:pool] && spec.config[:pool].to_i > 0
          FixedSizeConnectionPool.new(spec)
        elsif spec.config[:jndi] # JRuby appserver datasource pool; passthrough
          NewConnectionEveryTime.new(spec)
        else
          CachedConnectionPerThread.new(spec)
        end
      end

      delegate :verification_timeout, :to => "::ActiveRecord::Base"
      attr_reader :spec

      def initialize(spec)
        @spec = spec
        # The cache of reserved connections mapped to threads
        @reserved_connections = {}
        # The mutex used to synchronize pool access
        @connection_mutex = Monitor.new
      end

      # Retrieve the connection associated with the current thread, or call
      # #checkout to obtain one if necessary.
      #
      # #connection can be called any number of times; the connection is
      # held in a hash keyed by the thread id.
      def connection
        if conn = @reserved_connections[active_connection_name]
          conn.verify!(verification_timeout)
          conn
        else
          @reserved_connections[active_connection_name] = checkout
        end
      end

      # Signal that the thread is finished with the current connection.
      # #release_connection releases the connection-thread association
      # and returns the connection to the pool.
      def release_connection
        conn = @reserved_connections.delete(active_connection_name)
        checkin conn if conn
      end

      # Reserve a connection, and yield it to a block. Ensure the connection is
      # checked back in when finished.
      def with_connection
        conn = checkout
        yield conn
      ensure
        checkin conn
      end

      # Returns true if a connection has already been opened.
      def connected?
        !connections.empty?
      end

      # Disconnect all connections in the pool.
      def disconnect!
        @reserved_connections.each do |name,conn|
          checkin conn
        end
        connections.each do |conn|
          conn.disconnect!
        end
        @reserved_connections = {}
      end

      # Clears the cache which maps classes
      def clear_reloadable_connections!
        @reserved_connections.each do |name, conn|
          checkin conn
        end
        @reserved_connections = {}
        connections.each do |conn|
          if conn.requires_reloading?
            conn.disconnect!
            remove_connection conn
          end
        end
      end

      # Verify active connections and remove and disconnect connections
      # associated with stale threads.
      def verify_active_connections! #:nodoc:
        remove_stale_cached_threads!(@reserved_connections) do |name, conn|
          checkin conn
        end
        connections.each do |connection|
          connection.verify!(verification_timeout)
        end
      end

      # Check-out a database connection from the pool.
      def checkout
        raise NotImplementedError, "checkout is an abstract method"
      end

      # Check-in a database connection back into the pool.
      def checkin(connection)
        raise NotImplementedError, "checkin is an abstract method"
      end

      def remove_connection(conn) #:nodoc:
        raise NotImplementedError, "remove_connection is an abstract method"
      end
      private :remove_connection

      def connections #:nodoc:
        raise NotImplementedError, "connections is an abstract method"
      end
      private :connections

      synchronize :connection, :release_connection,
        :clear_reloadable_connections!, :verify_active_connections!,
        :connected?, :disconnect!, :with => :@connection_mutex

      private
      def new_connection
        config = spec.config.reverse_merge(:allow_concurrency => ActiveRecord::Base.allow_concurrency)
        ActiveRecord::Base.send(spec.adapter_method, config)
      end

      def active_connection_name #:nodoc:
        Thread.current.object_id
      end

      # Remove stale threads from the cache.
      def remove_stale_cached_threads!(cache, &block)
        keys = Set.new(cache.keys)

        Thread.list.each do |thread|
          keys.delete(thread.object_id) if thread.alive?
        end
        keys.each do |key|
          next unless cache.has_key?(key)
          block.call(key, cache[key])
          cache.delete(key)
        end
      end
    end

    # NewConnectionEveryTime is a simple implementation: always
    # create/disconnect on checkout/checkin.
    class NewConnectionEveryTime < ConnectionPool
      def active_connection
        @reserved_connections[active_connection_name]
      end

      def active_connections; @reserved_connections; end

      def checkout
        new_connection
      end

      def checkin(conn)
        conn.disconnect!
      end

      private
      def connections
        @reserved_connections.values
      end

      def remove_connection(conn)
        @reserved_connections.delete_if {|k,v| v == conn}
      end
    end

    # CachedConnectionPerThread is a compatible pseudo-connection pool that
    # caches connections per-thread. In order to hold onto threads in the same
    # manner as ActiveRecord 2.1 and earlier, it only disconnects the
    # connection when the connection is checked in, or when calling
    # ActiveRecord::Base.clear_all_connections!, and not during
    # #release_connection.
    class CachedConnectionPerThread < NewConnectionEveryTime
      def release_connection
        # no-op; keep the connection
      end
    end

    # FixedSizeConnectionPool provides a full, fixed-size connection pool with
    # timed waits when the pool is exhausted.
    class FixedSizeConnectionPool < ConnectionPool
      def initialize(spec)
        super
        # default 5 second timeout
        @timeout = spec.config[:wait_timeout] || 5
        @size = spec.config[:pool].to_i
        @queue = @connection_mutex.new_cond
        @connections = []
        @checked_out = []
      end

      def checkout
        # Checkout an available connection
        conn = @connection_mutex.synchronize do
          if @checked_out.size < @connections.size
            checkout_existing_connection
          elsif @connections.size < @size
            checkout_new_connection
          end
        end
        return conn if conn

        # No connections available; wait for one
        @connection_mutex.synchronize do
          if @queue.wait(@timeout)
            checkout_existing_connection
          else
            raise ConnectionTimeoutError, "could not obtain a database connection in a timely fashion"
          end
        end
      end

      def checkin(conn)
        @connection_mutex.synchronize do
          @checked_out.delete conn
          @queue.signal
        end
      end

      private
      def checkout_new_connection
        c = new_connection
        @connections << c
        @checked_out << c
        c
      end

      def checkout_existing_connection
        c = (@connections - @checked_out).first
        @checked_out << c
        c
      end

      def connections
        @connections
      end

      def remove_connection(conn)
        @connections.delete conn
      end
    end

    module ConnectionHandlerMethods
      def initialize(pools = {})
        @connection_pools = pools
      end

      def connection_pools
        @connection_pools ||= {}
      end

      def establish_connection(name, spec)
        @connection_pools[name] = ConnectionAdapters::ConnectionPool.create(spec)
      end

      # for internal use only and for testing;
      # only works with ConnectionPerThread pool class
      def active_connections #:nodoc:
        @connection_pools.inject({}) do |hash,kv|
          hash[kv.first] = kv.last.active_connection
          hash.delete(kv.first) unless hash[kv.first]
          hash
        end
      end

      # Clears the cache which maps classes to connections.
      def clear_active_connections!
        @connection_pools.each_value {|pool| pool.release_connection }
      end

      # Clears the cache which maps classes
      def clear_reloadable_connections!
        @connection_pools.each_value {|pool| pool.clear_reloadable_connections! }
      end

      def clear_all_connections!
        @connection_pools.each_value {|pool| pool.disconnect! }
      end

      # Verify active connections.
      def verify_active_connections! #:nodoc:
        @connection_pools.each_value {|pool| pool.verify_active_connections! }
      end

      # Locate the connection of the nearest super class. This can be an
      # active or defined connection: if it is the latter, it will be
      # opened and set as the active connection for the class it was defined
      # for (not necessarily the current class).
      def retrieve_connection(klass) #:nodoc:
        pool = retrieve_connection_pool(klass)
        (pool && pool.connection) or raise ConnectionNotEstablished
      end

      # Returns true if a connection that's accessible to this class has
      # already been opened.
      def connected?(klass)
        retrieve_connection_pool(klass).connected?
      end

      # Remove the connection for this class. This will close the active
      # connection and the defined connection (if they exist). The result
      # can be used as an argument for establish_connection, for easily
      # re-establishing the connection.
      def remove_connection(klass)
        pool = @connection_pools[klass.name]
        @connection_pools.delete_if { |key, value| value == pool }
        pool.disconnect! if pool
        pool.spec.config if pool
      end

      def retrieve_connection_pool(klass)
        loop do
          pool = @connection_pools[klass.name]
          return pool if pool
          return nil if ActiveRecord::Base == klass
          klass = klass.superclass
        end
      end
    end

    # This connection handler is not thread-safe, as it does not protect access
    # to the underlying connection pools.
    class SingleThreadConnectionHandler
      include ConnectionHandlerMethods
    end

    # This connection handler is thread-safe. Each access or modification of a thread
    # pool is synchronized by an internal monitor.
    class MultipleThreadConnectionHandler
      attr_reader :connection_pools_lock
      include ConnectionHandlerMethods

      def initialize(pools = {})
        super
        @connection_pools_lock = Monitor.new
      end

      # Apply monitor to all public methods that access the pool.
      synchronize :establish_connection, :retrieve_connection,
        :connected?, :remove_connection, :active_connections,
        :clear_active_connections!, :clear_reloadable_connections!,
        :clear_all_connections!, :verify_active_connections!,
        :with => :connection_pools_lock
    end
  end
end