aboutsummaryrefslogblamecommitdiffstats
path: root/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb
blob: ca9f0a5d9bbb711ae8fb2c497bf2bef830fb4123 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
                 

             


                           
                                                                   


                                            




                                                   


                                                   





























                                                                                     
                                           














                                                                         
                                                  








                                                                    
                                            
            
                                  










                                                                                             
                                               






                                                         
                                       


                                                                           

                                                                                                        




                                        



                                                                                          

















                                                           
                                           




                                       
 



                                   
 

                                


                                          
                                                                              





















                                                                                 
                                                               















                                                                             















                                                                                             
             






                                                     

           
























                                                                                     

     
require 'monitor'
require 'set'

module ActiveRecord
  module ConnectionAdapters
    class ConnectionPool
      delegate :verification_timeout, :to => "::ActiveRecord::Base"
      attr_reader :active_connections, :spec

      def initialize(spec)
        # The thread id -> adapter cache.
        @active_connections = {}

        # The ConnectionSpecification for this pool
        @spec = spec

        # The mutex used to synchronize pool access
        @connection_mutex = Monitor.new
      end

      def active_connection_name #:nodoc:
        Thread.current.object_id
      end

      def active_connection
        active_connections[active_connection_name]
      end

      # Returns the connection currently associated with the class. This can
      # also be used to "borrow" the connection to do database work unrelated
      # to any of the specific Active Records.
      def connection
        if conn = active_connections[active_connection_name]
          conn
        else
          # retrieve_connection sets the cache key.
          conn = retrieve_connection
          active_connections[active_connection_name] = conn
        end
      end

      # Clears the cache which maps classes to connections.
      def clear_active_connections!
        clear_entries!(@active_connections, [active_connection_name]) do |name, conn|
          conn.disconnect!
        end
      end

      # Clears the cache which maps classes
      def clear_reloadable_connections!
        @active_connections.each do |name, conn|
          if conn.requires_reloading?
            conn.disconnect!
            @active_connections.delete(name)
          end
        end
      end

      # Verify active connections.
      def verify_active_connections! #:nodoc:
        remove_stale_cached_threads!(@active_connections) do |name, conn|
          conn.disconnect!
        end
        active_connections.each_value do |connection|
          connection.verify!(verification_timeout)
        end
      end

      def retrieve_connection #:nodoc:
        # Name is nil if establish_connection hasn't been called for
        # some class along the inheritance chain up to AR::Base yet.
        name = active_connection_name
        if conn = active_connections[name]
          # Verify the connection.
          conn.verify!(verification_timeout)
        else
          self.set_connection spec
          conn = active_connections[name]
        end

        conn or raise ConnectionNotEstablished
      end

      # Returns true if a connection that's accessible to this class has already been opened.
      def connected?
        active_connections[active_connection_name] ? true : false
      end

      # Disconnect all connections in the pool.
      def disconnect!
        clear_cache!(@active_connections) do |name, conn|
          conn.disconnect!
        end
      end

      # Set the connection for the class.
      def set_connection(spec) #:nodoc:
        if spec.kind_of?(ActiveRecord::ConnectionAdapters::AbstractAdapter)
          active_connections[active_connection_name] = spec
        elsif spec.kind_of?(ActiveRecord::Base::ConnectionSpecification)
          config = spec.config.reverse_merge(:allow_concurrency => ActiveRecord::Base.allow_concurrency)
          self.set_connection ActiveRecord::Base.send(spec.adapter_method, config)
        else
          raise ConnectionNotEstablished
        end
      end

      synchronize :active_connection, :connection, :clear_active_connections!,
        :clear_reloadable_connections!, :verify_active_connections!, :retrieve_connection,
        :connected?, :disconnect!, :set_connection, :with => :@connection_mutex

      private
        def clear_cache!(cache, &block)
          cache.each(&block) if block_given?
          cache.clear
        end

        # Remove stale threads from the cache.
        def remove_stale_cached_threads!(cache, &block)
          stale = Set.new(cache.keys)

          Thread.list.each do |thread|
            stale.delete(thread.object_id) if thread.alive?
          end
          clear_entries!(cache, stale, &block)
        end

        def clear_entries!(cache, keys, &block)
          keys.each do |key|
            next unless cache.has_key?(key)
            block.call(key, cache[key])
            cache.delete(key)
          end
        end
    end

    module ConnectionHandlerMethods
      def initialize(pools = {})
        @connection_pools = pools
      end

      def connection_pools
        @connection_pools ||= {}
      end

      def establish_connection(name, spec)
        @connection_pools[name] = ConnectionAdapters::ConnectionPool.new(spec)
      end

      # for internal use only and for testing
      def active_connections #:nodoc:
        @connection_pools.inject({}) do |hash,kv|
          hash[kv.first] = kv.last.active_connection
          hash.delete(kv.first) unless hash[kv.first]
          hash
        end
      end

      # Clears the cache which maps classes to connections.
      def clear_active_connections!
        @connection_pools.each_value {|pool| pool.clear_active_connections! }
      end

      # Clears the cache which maps classes
      def clear_reloadable_connections!
        @connection_pools.each_value {|pool| pool.clear_reloadable_connections! }
      end

      def clear_all_connections!
        @connection_pools.each_value {|pool| pool.disconnect! }
      end

      # Verify active connections.
      def verify_active_connections! #:nodoc:
        @connection_pools.each_value {|pool| pool.verify_active_connections!}
      end

      # Locate the connection of the nearest super class. This can be an
      # active or defined connection: if it is the latter, it will be
      # opened and set as the active connection for the class it was defined
      # for (not necessarily the current class).
      def retrieve_connection(klass) #:nodoc:
        pool = retrieve_connection_pool(klass)
        (pool && pool.connection) or raise ConnectionNotEstablished
      end

      # Returns true if a connection that's accessible to this class has already been opened.
      def connected?(klass)
        retrieve_connection_pool(klass).connected?
      end

      # Remove the connection for this class. This will close the active
      # connection and the defined connection (if they exist). The result
      # can be used as an argument for establish_connection, for easily
      # re-establishing the connection.
      def remove_connection(klass)
        pool = @connection_pools[klass.name]
        @connection_pools.delete_if { |key, value| value == pool }
        pool.disconnect! if pool
        pool.spec.config if pool
      end

      private
        def retrieve_connection_pool(klass)
          loop do
            pool = @connection_pools[klass.name]
            return pool if pool
            return nil if ActiveRecord::Base == klass
            klass = klass.superclass
          end
        end
    end

    # This connection handler is not thread-safe, as it does not protect access
    # to the underlying connection pools.
    class SingleThreadConnectionHandler
      include ConnectionHandlerMethods
    end

    # This connection handler is thread-safe. Each access or modification of a thread
    # pool is synchronized by an internal monitor.
    class MultipleThreadConnectionHandler
      attr_reader :connection_pools_lock
      include ConnectionHandlerMethods

      def initialize(pools = {})
        super
        @connection_pools_lock = Monitor.new
      end

      # Apply monitor to all public methods that access the pool.
      synchronize :establish_connection, :retrieve_connection,
        :connected?, :remove_connection, :active_connections,
        :clear_active_connections!, :clear_reloadable_connections!,
        :clear_all_connections!, :verify_active_connections!,
        :with => :connection_pools_lock
    end
  end
end