require 'monitor' require 'set' module ActiveRecord module ConnectionAdapters class ConnectionPool delegate :verification_timeout, :to => "::ActiveRecord::Base" attr_reader :active_connections, :spec def initialize(spec) # The thread id -> adapter cache. @active_connections = {} # The ConnectionSpecification for this pool @spec = spec # The mutex used to synchronize pool access @connection_mutex = Monitor.new end def active_connection_name #:nodoc: Thread.current.object_id end def active_connection active_connections[active_connection_name] end # Returns the connection currently associated with the class. This can # also be used to "borrow" the connection to do database work unrelated # to any of the specific Active Records. def connection if conn = active_connections[active_connection_name] conn else # retrieve_connection sets the cache key. conn = retrieve_connection active_connections[active_connection_name] = conn end end # Clears the cache which maps classes to connections. def clear_active_connections! clear_entries!(@active_connections, [active_connection_name]) do |name, conn| conn.disconnect! end end # Clears the cache which maps classes def clear_reloadable_connections! @active_connections.each do |name, conn| if conn.requires_reloading? conn.disconnect! @active_connections.delete(name) end end end # Verify active connections. def verify_active_connections! #:nodoc: remove_stale_cached_threads!(@active_connections) do |name, conn| conn.disconnect! end active_connections.each_value do |connection| connection.verify!(verification_timeout) end end def retrieve_connection #:nodoc: # Name is nil if establish_connection hasn't been called for # some class along the inheritance chain up to AR::Base yet. name = active_connection_name if conn = active_connections[name] # Verify the connection. conn.verify!(verification_timeout) else self.set_connection spec conn = active_connections[name] end conn or raise ConnectionNotEstablished end # Returns true if a connection that's accessible to this class has already been opened. def connected? active_connections[active_connection_name] ? true : false end # Disconnect all connections in the pool. def disconnect! clear_cache!(@active_connections) do |name, conn| conn.disconnect! end end # Set the connection for the class. def set_connection(spec) #:nodoc: if spec.kind_of?(ActiveRecord::ConnectionAdapters::AbstractAdapter) active_connections[active_connection_name] = spec elsif spec.kind_of?(ActiveRecord::Base::ConnectionSpecification) config = spec.config.reverse_merge(:allow_concurrency => ActiveRecord::Base.allow_concurrency) self.set_connection ActiveRecord::Base.send(spec.adapter_method, config) else raise ConnectionNotEstablished end end synchronize :active_connection, :connection, :clear_active_connections!, :clear_reloadable_connections!, :verify_active_connections!, :retrieve_connection, :connected?, :disconnect!, :set_connection, :with => :@connection_mutex private def clear_cache!(cache, &block) cache.each(&block) if block_given? cache.clear end # Remove stale threads from the cache. def remove_stale_cached_threads!(cache, &block) stale = Set.new(cache.keys) Thread.list.each do |thread| stale.delete(thread.object_id) if thread.alive? end clear_entries!(cache, stale, &block) end def clear_entries!(cache, keys, &block) keys.each do |key| next unless cache.has_key?(key) block.call(key, cache[key]) cache.delete(key) end end end module ConnectionHandlerMethods def initialize(pools = {}) @connection_pools = pools end def connection_pools @connection_pools ||= {} end def establish_connection(name, spec) @connection_pools[name] = ConnectionAdapters::ConnectionPool.new(spec) end # for internal use only and for testing def active_connections #:nodoc: @connection_pools.inject({}) do |hash,kv| hash[kv.first] = kv.last.active_connection hash.delete(kv.first) unless hash[kv.first] hash end end # Clears the cache which maps classes to connections. def clear_active_connections! @connection_pools.each_value {|pool| pool.clear_active_connections! } end # Clears the cache which maps classes def clear_reloadable_connections! @connection_pools.each_value {|pool| pool.clear_reloadable_connections! } end def clear_all_connections! @connection_pools.each_value {|pool| pool.disconnect! } end # Verify active connections. def verify_active_connections! #:nodoc: @connection_pools.each_value {|pool| pool.verify_active_connections!} end # Locate the connection of the nearest super class. This can be an # active or defined connection: if it is the latter, it will be # opened and set as the active connection for the class it was defined # for (not necessarily the current class). def retrieve_connection(klass) #:nodoc: pool = retrieve_connection_pool(klass) (pool && pool.connection) or raise ConnectionNotEstablished end # Returns true if a connection that's accessible to this class has already been opened. def connected?(klass) retrieve_connection_pool(klass).connected? end # Remove the connection for this class. This will close the active # connection and the defined connection (if they exist). The result # can be used as an argument for establish_connection, for easily # re-establishing the connection. def remove_connection(klass) pool = @connection_pools[klass.name] @connection_pools.delete_if { |key, value| value == pool } pool.disconnect! if pool pool.spec.config if pool end private def retrieve_connection_pool(klass) loop do pool = @connection_pools[klass.name] return pool if pool return nil if ActiveRecord::Base == klass klass = klass.superclass end end end # This connection handler is not thread-safe, as it does not protect access # to the underlying connection pools. class SingleThreadConnectionHandler include ConnectionHandlerMethods end # This connection handler is thread-safe. Each access or modification of a thread # pool is synchronized by an internal monitor. class MultipleThreadConnectionHandler attr_reader :connection_pools_lock include ConnectionHandlerMethods def initialize(pools = {}) super @connection_pools_lock = Monitor.new end # Apply monitor to all public methods that access the pool. synchronize :establish_connection, :retrieve_connection, :connected?, :remove_connection, :active_connections, :clear_active_connections!, :clear_reloadable_connections!, :clear_all_connections!, :verify_active_connections!, :with => :connection_pools_lock end end end