aboutsummaryrefslogtreecommitdiffstats
path: root/activerecord/lib
diff options
context:
space:
mode:
Diffstat (limited to 'activerecord/lib')
-rw-r--r--activerecord/lib/active_record/base.rb20
-rw-r--r--activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb281
-rw-r--r--activerecord/lib/active_record/connection_adapters/abstract/connection_specification.rb242
-rwxr-xr-x[-rw-r--r--]activerecord/lib/active_record/connection_adapters/abstract_adapter.rb19
-rw-r--r--activerecord/lib/active_record/connection_adapters/mysql_adapter.rb14
-rw-r--r--activerecord/lib/active_record/fixtures.rb2
6 files changed, 346 insertions, 232 deletions
diff --git a/activerecord/lib/active_record/base.rb b/activerecord/lib/active_record/base.rb
index b5ffc471bc..bc6d61301f 100644
--- a/activerecord/lib/active_record/base.rb
+++ b/activerecord/lib/active_record/base.rb
@@ -452,13 +452,6 @@ module ActiveRecord #:nodoc:
cattr_accessor :default_timezone, :instance_writer => false
@@default_timezone = :local
- # Determines whether to use a connection for each thread, or a single shared connection for all threads.
- # Defaults to false. If you're writing a threaded application, set to true
- # and periodically call verify_active_connections! to clear out connections
- # assigned to stale threads.
- cattr_accessor :allow_concurrency, :instance_writer => false
- @@allow_concurrency = false
-
# Specifies the format to use when dumping the database schema with Rails'
# Rakefile. If :sql, the schema is dumped as (potentially database-
# specific) SQL statements. If :ruby, the schema is dumped as an
@@ -1943,22 +1936,11 @@ module ActiveRecord #:nodoc:
end
end
- def thread_safe_scoped_methods #:nodoc:
+ def scoped_methods #:nodoc:
scoped_methods = (Thread.current[:scoped_methods] ||= {})
scoped_methods[self] ||= []
end
- def single_threaded_scoped_methods #:nodoc:
- @scoped_methods ||= []
- end
-
- # pick up the correct scoped_methods version from @@allow_concurrency
- if @@allow_concurrency
- alias_method :scoped_methods, :thread_safe_scoped_methods
- else
- alias_method :scoped_methods, :single_threaded_scoped_methods
- end
-
def current_scoped_methods #:nodoc:
scoped_methods.last
end
diff --git a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb
new file mode 100644
index 0000000000..838b0434b0
--- /dev/null
+++ b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb
@@ -0,0 +1,281 @@
+require 'monitor'
+require 'set'
+
+module ActiveRecord
+ # Raised when a connection could not be obtained within the connection
+ # acquisition timeout period.
+ class ConnectionTimeoutError < ConnectionNotEstablished
+ end
+
+ module ConnectionAdapters
+ # Connection pool base class for managing ActiveRecord database
+ # connections.
+ #
+ # Connections can be obtained and used from a connection pool in several
+ # ways:
+ #
+ # 1. Simply use ActiveRecord::Base.connection as with ActiveRecord 2.1 and
+ # earlier (pre-connection-pooling). Eventually, when you're done with
+ # the connection(s) and wish it to be returned to the pool, you call
+ # ActiveRecord::Base.clear_active_connections!. This will be the
+ # default behavior for ActiveRecord when used in conjunction with
+ # ActionPack's request handling cycle.
+ # 2. Manually check out a connection from the pool with
+ # ActiveRecord::Base.connection_pool.checkout. You are responsible for
+ # returning this connection to the pool when finished by calling
+ # ActiveRecord::Base.connection_pool.checkin(connection).
+ # 3. Use ActiveRecord::Base.connection_pool.with_connection(&block), which
+ # obtains a connection, yields it as the sole argument to the block,
+ # and returns it to the pool after the block completes.
+ #
+ # There are two connection-pooling-related options that you can add to
+ # your database connection configuration:
+ #
+ # * +pool+: number indicating size of connection pool (default 5)
+ # * +wait_timeout+: number of seconds to block and wait for a connection
+ # before giving up and raising a timeout error (default 5 seconds).
+ class ConnectionPool
+ delegate :verification_timeout, :to => "::ActiveRecord::Base"
+ attr_reader :spec
+
+ def initialize(spec)
+ @spec = spec
+ # The cache of reserved connections mapped to threads
+ @reserved_connections = {}
+ # The mutex used to synchronize pool access
+ @connection_mutex = Monitor.new
+ @queue = @connection_mutex.new_cond
+ # default 5 second timeout
+ @timeout = spec.config[:wait_timeout] || 5
+ # default max pool size to 5
+ @size = (spec.config[:pool] && spec.config[:pool].to_i) || 5
+ @connections = []
+ @checked_out = []
+ end
+
+ # Retrieve the connection associated with the current thread, or call
+ # #checkout to obtain one if necessary.
+ #
+ # #connection can be called any number of times; the connection is
+ # held in a hash keyed by the thread id.
+ def connection
+ if conn = @reserved_connections[current_connection_id]
+ conn.verify!(verification_timeout)
+ conn
+ else
+ @reserved_connections[current_connection_id] = checkout
+ end
+ end
+
+ # Signal that the thread is finished with the current connection.
+ # #release_connection releases the connection-thread association
+ # and returns the connection to the pool.
+ def release_connection
+ conn = @reserved_connections.delete(current_connection_id)
+ checkin conn if conn
+ end
+
+ # Reserve a connection, and yield it to a block. Ensure the connection is
+ # checked back in when finished.
+ def with_connection
+ conn = checkout
+ yield conn
+ ensure
+ checkin conn
+ end
+
+ # Returns true if a connection has already been opened.
+ def connected?
+ !@connections.empty?
+ end
+
+ # Disconnect all connections in the pool.
+ def disconnect!
+ @reserved_connections.each do |name,conn|
+ checkin conn
+ end
+ @reserved_connections = {}
+ @connections.each do |conn|
+ conn.disconnect!
+ end
+ @connections = []
+ end
+
+ # Clears the cache which maps classes
+ def clear_reloadable_connections!
+ @reserved_connections.each do |name, conn|
+ checkin conn
+ end
+ @reserved_connections = {}
+ @connections.each do |conn|
+ conn.disconnect! if conn.requires_reloading?
+ end
+ @connections = []
+ end
+
+ # Verify active connections and remove and disconnect connections
+ # associated with stale threads.
+ def verify_active_connections! #:nodoc:
+ clear_stale_cached_connections!
+ @connections.each do |connection|
+ connection.verify!(verification_timeout)
+ end
+ end
+
+ # Return any checked-out connections back to the pool by threads that
+ # are no longer alive.
+ def clear_stale_cached_connections!
+ remove_stale_cached_threads!(@reserved_connections) do |name, conn|
+ checkin conn
+ end
+ end
+
+ # Check-out a database connection from the pool.
+ def checkout
+ # Checkout an available connection
+ conn = @connection_mutex.synchronize do
+ if @checked_out.size < @connections.size
+ checkout_existing_connection
+ elsif @connections.size < @size
+ checkout_new_connection
+ end
+ end
+ return conn if conn
+
+ # No connections available; wait for one
+ @connection_mutex.synchronize do
+ if @queue.wait(@timeout)
+ checkout_existing_connection
+ else
+ raise ConnectionTimeoutError, "could not obtain a database connection in a timely fashion"
+ end
+ end
+ end
+
+ # Check-in a database connection back into the pool.
+ def checkin(conn)
+ @connection_mutex.synchronize do
+ conn.run_callbacks :checkin
+ @checked_out.delete conn
+ @queue.signal
+ end
+ end
+
+ synchronize :clear_reloadable_connections!, :verify_active_connections!,
+ :connected?, :disconnect!, :with => :@connection_mutex
+
+ private
+ def new_connection
+ config = spec.config.reverse_merge(:allow_concurrency => true)
+ ActiveRecord::Base.send(spec.adapter_method, config)
+ end
+
+ def current_connection_id #:nodoc:
+ Thread.current.object_id
+ end
+
+ # Remove stale threads from the cache.
+ def remove_stale_cached_threads!(cache, &block)
+ keys = Set.new(cache.keys)
+
+ Thread.list.each do |thread|
+ keys.delete(thread.object_id) if thread.alive?
+ end
+ keys.each do |key|
+ next unless cache.has_key?(key)
+ block.call(key, cache[key])
+ cache.delete(key)
+ end
+ end
+
+ def checkout_new_connection
+ c = new_connection
+ @connections << c
+ checkout_and_verify(c)
+ end
+
+ def checkout_existing_connection
+ c = (@connections - @checked_out).first
+ checkout_and_verify(c)
+ end
+
+ def checkout_and_verify(c)
+ c.run_callbacks :checkout
+ c.verify!(verification_timeout)
+ @checked_out << c
+ c
+ end
+ end
+
+ class ConnectionHandler
+ def initialize(pools = {})
+ @connection_pools = pools
+ end
+
+ def connection_pools
+ @connection_pools ||= {}
+ end
+
+ def establish_connection(name, spec)
+ @connection_pools[name] = ConnectionAdapters::ConnectionPool.new(spec)
+ end
+
+ # Returns any connections in use by the current thread back to the pool,
+ # and also returns connections to the pool cached by threads that are no
+ # longer alive.
+ def clear_active_connections!
+ @connection_pools.each_value do |pool|
+ pool.release_connection
+ pool.clear_stale_cached_connections!
+ end
+ end
+
+ # Clears the cache which maps classes
+ def clear_reloadable_connections!
+ @connection_pools.each_value {|pool| pool.clear_reloadable_connections! }
+ end
+
+ def clear_all_connections!
+ @connection_pools.each_value {|pool| pool.disconnect! }
+ end
+
+ # Verify active connections.
+ def verify_active_connections! #:nodoc:
+ @connection_pools.each_value {|pool| pool.verify_active_connections! }
+ end
+
+ # Locate the connection of the nearest super class. This can be an
+ # active or defined connection: if it is the latter, it will be
+ # opened and set as the active connection for the class it was defined
+ # for (not necessarily the current class).
+ def retrieve_connection(klass) #:nodoc:
+ pool = retrieve_connection_pool(klass)
+ (pool && pool.connection) or raise ConnectionNotEstablished
+ end
+
+ # Returns true if a connection that's accessible to this class has
+ # already been opened.
+ def connected?(klass)
+ retrieve_connection_pool(klass).connected?
+ end
+
+ # Remove the connection for this class. This will close the active
+ # connection and the defined connection (if they exist). The result
+ # can be used as an argument for establish_connection, for easily
+ # re-establishing the connection.
+ def remove_connection(klass)
+ pool = @connection_pools[klass.name]
+ @connection_pools.delete_if { |key, value| value == pool }
+ pool.disconnect! if pool
+ pool.spec.config if pool
+ end
+
+ def retrieve_connection_pool(klass)
+ pool = @connection_pools[klass.name]
+ return pool if pool
+ return nil if ActiveRecord::Base == klass
+ retrieve_connection_pool klass.superclass
+ end
+ end
+ end
+end \ No newline at end of file
diff --git a/activerecord/lib/active_record/connection_adapters/abstract/connection_specification.rb b/activerecord/lib/active_record/connection_adapters/abstract/connection_specification.rb
index 2a8807fb78..417a333aab 100644
--- a/activerecord/lib/active_record/connection_adapters/abstract/connection_specification.rb
+++ b/activerecord/lib/active_record/connection_adapters/abstract/connection_specification.rb
@@ -1,5 +1,3 @@
-require 'set'
-
module ActiveRecord
class Base
class ConnectionSpecification #:nodoc:
@@ -14,158 +12,9 @@ module ActiveRecord
cattr_accessor :verification_timeout, :instance_writer => false
@@verification_timeout = 0
- # The class -> [adapter_method, config] map
- @@defined_connections = {}
-
- # The class -> thread id -> adapter cache. (class -> adapter if not allow_concurrency)
- @@active_connections = {}
-
- class << self
- # Retrieve the connection cache.
- def thread_safe_active_connections #:nodoc:
- @@active_connections[Thread.current.object_id] ||= {}
- end
-
- def single_threaded_active_connections #:nodoc:
- @@active_connections
- end
-
- # pick up the right active_connection method from @@allow_concurrency
- if @@allow_concurrency
- alias_method :active_connections, :thread_safe_active_connections
- else
- alias_method :active_connections, :single_threaded_active_connections
- end
-
- # set concurrency support flag (not thread safe, like most of the methods in this file)
- def allow_concurrency=(threaded) #:nodoc:
- logger.debug "allow_concurrency=#{threaded}" if logger
- return if @@allow_concurrency == threaded
- clear_all_cached_connections!
- @@allow_concurrency = threaded
- method_prefix = threaded ? "thread_safe" : "single_threaded"
- sing = (class << self; self; end)
- [:active_connections, :scoped_methods].each do |method|
- sing.send(:alias_method, method, "#{method_prefix}_#{method}")
- end
- log_connections if logger
- end
-
- def active_connection_name #:nodoc:
- @active_connection_name ||=
- if active_connections[name] || @@defined_connections[name]
- name
- elsif self == ActiveRecord::Base
- nil
- else
- superclass.active_connection_name
- end
- end
-
- def clear_active_connection_name #:nodoc:
- @active_connection_name = nil
- subclasses.each { |klass| klass.clear_active_connection_name }
- end
-
- # Returns the connection currently associated with the class. This can
- # also be used to "borrow" the connection to do database work unrelated
- # to any of the specific Active Records.
- def connection
- if defined?(@active_connection_name) && (conn = active_connections[@active_connection_name])
- conn
- else
- # retrieve_connection sets the cache key.
- conn = retrieve_connection
- active_connections[@active_connection_name] = conn
- end
- end
-
- # Clears the cache which maps classes to connections.
- def clear_active_connections!
- clear_cache!(@@active_connections) do |name, conn|
- conn.disconnect!
- end
- end
-
- # Clears the cache which maps classes
- def clear_reloadable_connections!
- if @@allow_concurrency
- # With concurrent connections @@active_connections is
- # a hash keyed by thread id.
- @@active_connections.each do |thread_id, conns|
- conns.each do |name, conn|
- if conn.requires_reloading?
- conn.disconnect!
- @@active_connections[thread_id].delete(name)
- end
- end
- end
- else
- @@active_connections.each do |name, conn|
- if conn.requires_reloading?
- conn.disconnect!
- @@active_connections.delete(name)
- end
- end
- end
- end
-
- # Verify active connections.
- def verify_active_connections! #:nodoc:
- if @@allow_concurrency
- remove_stale_cached_threads!(@@active_connections) do |name, conn|
- conn.disconnect!
- end
- end
-
- active_connections.each_value do |connection|
- connection.verify!(@@verification_timeout)
- end
- end
-
- private
- def clear_cache!(cache, thread_id = nil, &block)
- if cache
- if @@allow_concurrency
- thread_id ||= Thread.current.object_id
- thread_cache, cache = cache, cache[thread_id]
- return unless cache
- end
-
- cache.each(&block) if block_given?
- cache.clear
- end
- ensure
- if thread_cache && @@allow_concurrency
- thread_cache.delete(thread_id)
- end
- end
-
- # Remove stale threads from the cache.
- def remove_stale_cached_threads!(cache, &block)
- stale = Set.new(cache.keys)
-
- Thread.list.each do |thread|
- stale.delete(thread.object_id) if thread.alive?
- end
-
- stale.each do |thread_id|
- clear_cache!(cache, thread_id, &block)
- end
- end
-
- def clear_all_cached_connections!
- if @@allow_concurrency
- @@active_connections.each_value do |connection_hash_for_thread|
- connection_hash_for_thread.each_value {|conn| conn.disconnect! }
- connection_hash_for_thread.clear
- end
- else
- @@active_connections.each_value {|conn| conn.disconnect! }
- end
- @@active_connections.clear
- end
- end
+ # The connection handler
+ cattr_accessor :connection_handler, :instance_writer => false
+ @@connection_handler = ConnectionAdapters::ConnectionHandler.new
# Returns the connection currently associated with the class. This can
# also be used to "borrow" the connection to do database work that isn't
@@ -208,9 +57,7 @@ module ActiveRecord
raise AdapterNotSpecified unless defined? RAILS_ENV
establish_connection(RAILS_ENV)
when ConnectionSpecification
- clear_active_connection_name
- @active_connection_name = name
- @@defined_connections[name] = spec
+ @@connection_handler.establish_connection(name, spec)
when Symbol, String
if configuration = configurations[spec.to_s]
establish_connection(configuration)
@@ -243,67 +90,42 @@ module ActiveRecord
end
end
- # Locate the connection of the nearest super class. This can be an
- # active or defined connection: if it is the latter, it will be
- # opened and set as the active connection for the class it was defined
- # for (not necessarily the current class).
- def self.retrieve_connection #:nodoc:
- # Name is nil if establish_connection hasn't been called for
- # some class along the inheritance chain up to AR::Base yet.
- if name = active_connection_name
- if conn = active_connections[name]
- # Verify the connection.
- conn.verify!(@@verification_timeout)
- elsif spec = @@defined_connections[name]
- # Activate this connection specification.
- klass = name.constantize
- klass.connection = spec
- conn = active_connections[name]
- end
+ class << self
+ # Deprecated and no longer has any effect.
+ def allow_concurrency
+ ActiveSupport::Deprecation.warn("ActiveRecord::Base.allow_concurrency has been deprecated and no longer has any effect. Please remove all references to allow_concurrency.")
end
- conn or raise ConnectionNotEstablished
- end
+ # Deprecated and no longer has any effect.
+ def allow_concurrency=(flag)
+ ActiveSupport::Deprecation.warn("ActiveRecord::Base.allow_concurrency= has been deprecated and no longer has any effect. Please remove all references to allow_concurrency=.")
+ end
- # Returns true if a connection that's accessible to this class has already been opened.
- def self.connected?
- active_connections[active_connection_name] ? true : false
- end
+ # Returns the connection currently associated with the class. This can
+ # also be used to "borrow" the connection to do database work unrelated
+ # to any of the specific Active Records.
+ def connection
+ retrieve_connection
+ end
- # Remove the connection for this class. This will close the active
- # connection and the defined connection (if they exist). The result
- # can be used as an argument for establish_connection, for easily
- # re-establishing the connection.
- def self.remove_connection(klass=self)
- spec = @@defined_connections[klass.name]
- konn = active_connections[klass.name]
- @@defined_connections.delete_if { |key, value| value == spec }
- active_connections.delete_if { |key, value| value == konn }
- konn.disconnect! if konn
- spec.config if spec
- end
+ def connection_pool
+ connection_handler.retrieve_connection_pool(self)
+ end
- # Set the connection for the class.
- def self.connection=(spec) #:nodoc:
- if spec.kind_of?(ActiveRecord::ConnectionAdapters::AbstractAdapter)
- active_connections[name] = spec
- elsif spec.kind_of?(ConnectionSpecification)
- config = spec.config.reverse_merge(:allow_concurrency => @@allow_concurrency)
- self.connection = self.send(spec.adapter_method, config)
- elsif spec.nil?
- raise ConnectionNotEstablished
- else
- establish_connection spec
+ def retrieve_connection
+ connection_handler.retrieve_connection(self)
end
- end
- # connection state logging
- def self.log_connections #:nodoc:
- if logger
- logger.info "Defined connections: #{@@defined_connections.inspect}"
- logger.info "Active connections: #{active_connections.inspect}"
- logger.info "Active connection name: #{@active_connection_name}"
+ def connected?
+ connection_handler.connected?(self)
end
+
+ def remove_connection(klass = self)
+ connection_handler.remove_connection(klass)
+ end
+
+ delegate :clear_active_connections!, :clear_reloadable_connections!,
+ :clear_all_connections!,:verify_active_connections!, :to => :connection_handler
end
end
end
diff --git a/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb b/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb
index 6924bb7e6f..005be9d72f 100644..100755
--- a/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb
+++ b/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb
@@ -7,6 +7,7 @@ require 'active_record/connection_adapters/abstract/schema_definitions'
require 'active_record/connection_adapters/abstract/schema_statements'
require 'active_record/connection_adapters/abstract/database_statements'
require 'active_record/connection_adapters/abstract/quoting'
+require 'active_record/connection_adapters/abstract/connection_pool'
require 'active_record/connection_adapters/abstract/connection_specification'
require 'active_record/connection_adapters/abstract/query_cache'
@@ -24,6 +25,9 @@ module ActiveRecord
class AbstractAdapter
include Quoting, DatabaseStatements, SchemaStatements
include QueryCache
+ include ActiveSupport::Callbacks
+ define_callbacks :checkout, :checkin
+ checkout :reset!
@@row_even = true
def initialize(connection, logger = nil) #:nodoc:
@@ -102,14 +106,25 @@ module ActiveRecord
@active = false
end
+ # Reset the state of this connection, directing the DBMS to clear
+ # transactions and other connection-related server-side state. Usually a
+ # database-dependent operation; the default method simply executes a
+ # ROLLBACK and swallows any exceptions which is probably not enough to
+ # ensure the connection is clean.
+ def reset!
+ silence_stderr do # postgres prints on stderr when you do this w/o a txn
+ execute "ROLLBACK" rescue nil
+ end
+ end
+
# Returns true if its safe to reload the connection between requests for development mode.
# This is not the case for Ruby/MySQL and it's not necessary for any adapters except SQLite.
def requires_reloading?
false
end
- # Lazily verify this connection, calling <tt>active?</tt> only if it hasn't
- # been called for +timeout+ seconds.
+ # Lazily verify this connection, calling <tt>active?</tt> only if it
+ # hasn't been called for +timeout+ seconds.
def verify!(timeout)
now = Time.now.to_i
if (now - @last_verification) > timeout
diff --git a/activerecord/lib/active_record/connection_adapters/mysql_adapter.rb b/activerecord/lib/active_record/connection_adapters/mysql_adapter.rb
index 204ebaa2e2..14c76ac455 100644
--- a/activerecord/lib/active_record/connection_adapters/mysql_adapter.rb
+++ b/activerecord/lib/active_record/connection_adapters/mysql_adapter.rb
@@ -280,6 +280,16 @@ module ActiveRecord
@connection.close rescue nil
end
+ def reset!
+ if @connection.respond_to?(:change_user)
+ # See http://bugs.mysql.com/bug.php?id=33540 -- the workaround way to
+ # reset the connection is to change the user to the same user.
+ @connection.change_user(@config[:username], @config[:password], @config[:database])
+ configure_connection
+ else
+ super
+ end
+ end
# DATABASE STATEMENTS ======================================
@@ -529,7 +539,11 @@ module ActiveRecord
end
@connection.real_connect(*@connection_options)
+ configure_connection
+ end
+ def configure_connection
+ encoding = @config[:encoding]
execute("SET NAMES '#{encoding}'") if encoding
# By default, MySQL 'where id is null' selects the last inserted id.
diff --git a/activerecord/lib/active_record/fixtures.rb b/activerecord/lib/active_record/fixtures.rb
index 622cfc3c3f..114141a646 100644
--- a/activerecord/lib/active_record/fixtures.rb
+++ b/activerecord/lib/active_record/fixtures.rb
@@ -955,7 +955,7 @@ module Test #:nodoc:
ActiveRecord::Base.connection.rollback_db_transaction
ActiveRecord::Base.connection.decrement_open_transactions
end
- ActiveRecord::Base.verify_active_connections!
+ ActiveRecord::Base.clear_active_connections!
end
private