diff options
Diffstat (limited to 'activerecord/lib/active_record/connection_adapters')
18 files changed, 673 insertions, 279 deletions
diff --git a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb index 77e64a22be..6535121075 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb @@ -1,7 +1,6 @@ require 'thread' require 'thread_safe' require 'monitor' -require 'set' module ActiveRecord # Raised when a connection could not be obtained within the connection @@ -10,6 +9,12 @@ module ActiveRecord class ConnectionTimeoutError < ConnectionNotEstablished end + # Raised when a pool was unable to get ahold of all its connections + # to perform a "group" action such as +ConnectionPool#disconnect!+ + # or +ConnectionPool#clear_reloadable_connections!+. + class ExclusiveConnectionTimeoutError < ConnectionTimeoutError + end + module ConnectionAdapters # Connection pool base class for managing Active Record database # connections. @@ -63,6 +68,15 @@ module ActiveRecord # connection at the end of a thread or a thread dies unexpectedly. # Regardless of this setting, the Reaper will be invoked before every # blocking wait. (Default nil, which means don't schedule the Reaper). + # + #-- + # Synchronization policy: + # * all public methods can be called outside +synchronize+ + # * access to these i-vars needs to be in +synchronize+: + # * @connections + # * @now_connecting + # * private methods that require being called in a +synchronize+ blocks + # are now explicitly documented class ConnectionPool # Threadsafe, fair, FIFO queue. Meant to be used by ConnectionPool # with which it shares a Monitor. But could be a generic Queue. @@ -129,17 +143,15 @@ module ActiveRecord # - ConnectionTimeoutError if +timeout+ is given and no element # becomes available within +timeout+ seconds, def poll(timeout = nil) - synchronize do - if timeout - no_wait_poll || wait_poll(timeout) - else - no_wait_poll - end - end + synchronize { internal_poll(timeout) } end private + def internal_poll(timeout) + no_wait_poll || (timeout && wait_poll(timeout)) + end + def synchronize(&block) @lock.synchronize(&block) end @@ -193,6 +205,80 @@ module ActiveRecord end end + # Adds the ability to turn a basic fair FIFO queue into one + # biased to some thread. + module BiasableQueue # :nodoc: + class BiasedConditionVariable # :nodoc: + # semantics of condition variables guarantee that +broadcast+, +broadcast_on_biased+, + # +signal+ and +wait+ methods are only called while holding a lock + def initialize(lock, other_cond, preferred_thread) + @real_cond = lock.new_cond + @other_cond = other_cond + @preferred_thread = preferred_thread + @num_waiting_on_real_cond = 0 + end + + def broadcast + broadcast_on_biased + @other_cond.broadcast + end + + def broadcast_on_biased + @num_waiting_on_real_cond = 0 + @real_cond.broadcast + end + + def signal + if @num_waiting_on_real_cond > 0 + @num_waiting_on_real_cond -= 1 + @real_cond + else + @other_cond + end.signal + end + + def wait(timeout) + if Thread.current == @preferred_thread + @num_waiting_on_real_cond += 1 + @real_cond + else + @other_cond + end.wait(timeout) + end + end + + def with_a_bias_for(thread) + previous_cond = nil + new_cond = nil + synchronize do + previous_cond = @cond + @cond = new_cond = BiasedConditionVariable.new(@lock, @cond, thread) + end + yield + ensure + synchronize do + @cond = previous_cond if previous_cond + new_cond.broadcast_on_biased if new_cond # wake up any remaining sleepers + end + end + end + + # Connections must be leased while holding the main pool mutex. This is + # an internal subclass that also +.leases+ returned connections while + # still in queue's critical section (queue synchronizes with the same + # +@lock+ as the main pool) so that a returned connection is already + # leased and there is no need to re-enter synchronized block. + class ConnectionLeasingQueue < Queue # :nodoc: + include BiasableQueue + + private + def internal_poll(timeout) + conn = super + conn.lease if conn + conn + end + end + # Every +frequency+ seconds, the reaper will call +reap+ on +pool+. # A reaper instantiated with a nil frequency will never reap the # connection pool. @@ -241,56 +327,75 @@ module ActiveRecord # default max pool size to 5 @size = (spec.config[:pool] && spec.config[:pool].to_i) || 5 - # The cache of reserved connections mapped to threads - @reserved_connections = ThreadSafe::Cache.new(:initial_capacity => @size) + # The cache of threads mapped to reserved connections, the sole purpose + # of the cache is to speed-up +connection+ method, it is not the authoritative + # registry of which thread owns which connection, that is tracked by + # +connection.owner+ attr on each +connection+ instance. + # The invariant works like this: if there is mapping of +thread => conn+, + # then that +thread+ does indeed own that +conn+, however an absence of a such + # mapping does not mean that the +thread+ doesn't own the said connection, in + # that case +conn.owner+ attr should be consulted. + # Access and modification of +@thread_cached_conns+ does not require + # synchronization. + @thread_cached_conns = ThreadSafe::Cache.new(:initial_capacity => @size) @connections = [] @automatic_reconnect = true - @available = Queue.new self + # Connection pool allows for concurrent (outside the main `synchronize` section) + # establishment of new connections. This variable tracks the number of threads + # currently in the process of independently establishing connections to the DB. + @now_connecting = 0 + + # A boolean toggle that allows/disallows new connections. + @new_cons_enabled = true + + @available = ConnectionLeasingQueue.new self end # Retrieve the connection associated with the current thread, or call # #checkout to obtain one if necessary. # # #connection can be called any number of times; the connection is - # held in a hash keyed by the thread id. + # held in a cache keyed by a thread. def connection - # this is correctly done double-checked locking - # (ThreadSafe::Cache's lookups have volatile semantics) - @reserved_connections[current_connection_id] || synchronize do - @reserved_connections[current_connection_id] ||= checkout - end + @thread_cached_conns[connection_cache_key(Thread.current)] ||= checkout end # Is there an open connection that is being used for the current thread? + # + # This method only works for connections that have been abtained through + # #connection or #with_connection methods, connections obtained through + # #checkout will not be detected by #active_connection? def active_connection? - synchronize do - @reserved_connections.fetch(current_connection_id) { - return false - }.in_use? - end + @thread_cached_conns[connection_cache_key(Thread.current)] end # Signal that the thread is finished with the current connection. # #release_connection releases the connection-thread association # and returns the connection to the pool. - def release_connection(with_id = current_connection_id) - synchronize do - conn = @reserved_connections.delete(with_id) - checkin conn if conn + # + # This method only works for connections that have been obtained through + # #connection or #with_connection methods, connections obtained through + # #checkout will not be automatically released. + def release_connection(owner_thread = Thread.current) + if conn = @thread_cached_conns.delete(connection_cache_key(owner_thread)) + checkin conn end end - # If a connection already exists yield it to the block. If no connection + # If a connection obtained through #connection or #with_connection methods + # already exists yield it to the block. If no such connection # exists checkout a connection, yield it to the block, and checkin the # connection when finished. def with_connection - connection_id = current_connection_id - fresh_connection = true unless active_connection? - yield connection + unless conn = @thread_cached_conns[connection_cache_key(Thread.current)] + conn = connection + fresh_connection = true + end + yield conn ensure - release_connection(connection_id) if fresh_connection + release_connection if fresh_connection end # Returns true if a connection has already been opened. @@ -299,32 +404,81 @@ module ActiveRecord end # Disconnects all connections in the pool, and clears the pool. - def disconnect! - synchronize do - @reserved_connections.clear - @connections.each do |conn| - checkin conn - conn.disconnect! + # + # Raises: + # - +ExclusiveConnectionTimeoutError+ if unable to gain ownership of all + # connections in the pool within a timeout interval (default duration is + # +spec.config[:checkout_timeout] * 2+ seconds). + def disconnect(raise_on_acquisition_timeout = true) + with_exclusively_acquired_all_connections(raise_on_acquisition_timeout) do + synchronize do + @connections.each do |conn| + checkin conn + conn.disconnect! + end + @connections = [] + @available.clear end - @connections = [] - @available.clear end end - # Clears the cache which maps classes. - def clear_reloadable_connections! - synchronize do - @reserved_connections.clear - @connections.each do |conn| - checkin conn - conn.disconnect! if conn.requires_reloading? - end - @connections.delete_if(&:requires_reloading?) - @available.clear - @connections.each do |conn| - @available.add conn + # Disconnects all connections in the pool, and clears the pool. + # + # The pool first tries to gain ownership of all connections, if unable to + # do so within a timeout interval (default duration is + # +spec.config[:checkout_timeout] * 2+ seconds), the pool is forcefully + # disconneted wihout any regard for other connection owning threads. + def disconnect! + disconnect(false) + end + + # Clears the cache which maps classes and re-connects connections that + # require reloading. + # + # Raises: + # - +ExclusiveConnectionTimeoutError+ if unable to gain ownership of all + # connections in the pool within a timeout interval (default duration is + # +spec.config[:checkout_timeout] * 2+ seconds). + def clear_reloadable_connections(raise_on_acquisition_timeout = true) + num_new_conns_required = 0 + + with_exclusively_acquired_all_connections(raise_on_acquisition_timeout) do + synchronize do + @connections.each do |conn| + checkin conn + conn.disconnect! if conn.requires_reloading? + end + @connections.delete_if(&:requires_reloading?) + + @available.clear + + if @connections.size < @size + # because of the pruning done by this method, we might be running + # low on connections, while threads stuck in queue are helpless + # (not being able to establish new connections for themselves), + # see also more detailed explanation in +remove+ + num_new_conns_required = num_waiting_in_queue - @connections.size + end + + @connections.each do |conn| + @available.add conn + end end end + + bulk_make_new_connections(num_new_conns_required) if num_new_conns_required > 0 + end + + # Clears the cache which maps classes and re-connects connections that + # require reloading. + # + # The pool first tries to gain ownership of all connections, if unable to + # do so within a timeout interval (default duration is + # +spec.config[:checkout_timeout] * 2+ seconds), the pool forcefully + # clears the cache and reloads connections without any regard for other + # connection owning threads. + def clear_reloadable_connections! + clear_reloadable_connections(false) end # Check-out a database connection from the pool, indicating that you want @@ -341,12 +495,8 @@ module ActiveRecord # # Raises: # - ConnectionTimeoutError: no connection can be obtained from the pool. - def checkout - synchronize do - conn = acquire_connection - conn.lease - checkout_and_verify(conn) - end + def checkout(checkout_timeout = @checkout_timeout) + checkout_and_verify(acquire_connection(checkout_timeout)) end # Check-in a database connection back into the pool, indicating that you @@ -356,14 +506,12 @@ module ActiveRecord # calling +checkout+ on this pool. def checkin(conn) synchronize do - owner = conn.owner + remove_connection_from_thread_cache conn conn.run_callbacks :checkin do conn.expire end - release conn, owner - @available.add conn end end @@ -371,14 +519,32 @@ module ActiveRecord # Remove a connection from the connection pool. The connection will # remain open and active but will no longer be managed by this pool. def remove(conn) + needs_new_connection = false + synchronize do + remove_connection_from_thread_cache conn + @connections.delete conn @available.delete conn - release conn, conn.owner - - @available.add checkout_new_connection if @available.any_waiting? + # @available.any_waiting? => true means that prior to removing this + # conn, the pool was at its max size (@connections.size == @size) + # this would mean that any threads stuck waiting in the queue wouldn't + # know they could checkout_new_connection, so let's do it for them. + # Because condition-wait loop is encapsulated in the Queue class + # (that in turn is oblivious to ConnectionPool implementation), threads + # that are "stuck" there are helpless, they have no way of creating + # new connections and are completely reliant on us feeding available + # connections into the Queue. + needs_new_connection = @available.any_waiting? end + + # This is intentionally done outside of the synchronized section as we + # would like not to hold the main mutex while checking out new connections, + # thus there is some chance that needs_new_connection information is now + # stale, we can live with that (bulk_make_new_connections will make + # sure not to exceed the pool's @size limit). + bulk_make_new_connections(1) if needs_new_connection end # Recover lost connections for the pool. A lost connection can occur if @@ -403,7 +569,118 @@ module ActiveRecord end end + def num_waiting_in_queue # :nodoc: + @available.num_waiting + end + private + #-- + # this is unfortunately not concurrent + def bulk_make_new_connections(num_new_conns_needed) + num_new_conns_needed.times do + # try_to_checkout_new_connection will not exceed pool's @size limit + if new_conn = try_to_checkout_new_connection + # make the new_conn available to the starving threads stuck @available Queue + checkin(new_conn) + end + end + end + + #-- + # From the discussion on Github: + # https://github.com/rails/rails/pull/14938#commitcomment-6601951 + # This hook-in method allows for easier monkey-patching fixes needed by + # JRuby users that use Fibers. + def connection_cache_key(thread) + thread + end + + # Take control of all existing connections so a "group" action such as + # reload/disconnect can be performed safely. It is no longer enough to + # wrap it in +synchronize+ because some pool's actions are allowed + # to be performed outside of the main +synchronize+ block. + def with_exclusively_acquired_all_connections(raise_on_acquisition_timeout = true) + with_new_connections_blocked do + attempt_to_checkout_all_existing_connections(raise_on_acquisition_timeout) + yield + end + end + + def attempt_to_checkout_all_existing_connections(raise_on_acquisition_timeout = true) + collected_conns = synchronize do + # account for our own connections + @connections.select {|conn| conn.owner == Thread.current} + end + + newly_checked_out = [] + timeout_time = Time.now + (@checkout_timeout * 2) + + @available.with_a_bias_for(Thread.current) do + while true + synchronize do + return if collected_conns.size == @connections.size && @now_connecting == 0 + remaining_timeout = timeout_time - Time.now + remaining_timeout = 0 if remaining_timeout < 0 + conn = checkout_for_exclusive_access(remaining_timeout) + collected_conns << conn + newly_checked_out << conn + end + end + end + rescue ExclusiveConnectionTimeoutError + # `raise_on_acquisition_timeout == false` means we are directed to ignore any + # timeouts and are expected to just give up: we've obtained as many connections + # as possible, note that in a case like that we don't return any of the + # `newly_checked_out` connections. + + if raise_on_acquisition_timeout + release_newly_checked_out = true + raise + end + rescue Exception # if something else went wrong + # this can't be a "naked" rescue, because we have should return conns + # even for non-StandardErrors + release_newly_checked_out = true + raise + ensure + if release_newly_checked_out && newly_checked_out + # releasing only those conns that were checked out in this method, conns + # checked outside this method (before it was called) are not for us to release + newly_checked_out.each {|conn| checkin(conn)} + end + end + + #-- + # Must be called in a synchronize block. + def checkout_for_exclusive_access(checkout_timeout) + checkout(checkout_timeout) + rescue ConnectionTimeoutError + # this block can't be easily moved into attempt_to_checkout_all_existing_connections's + # rescue block, because doing so would put it outside of synchronize section, without + # being in a critical section thread_report might become inaccurate + msg = "could not obtain ownership of all database connections in #{checkout_timeout} seconds" + + thread_report = [] + @connections.each do |conn| + unless conn.owner == Thread.current + thread_report << "#{conn} is owned by #{conn.owner}" + end + end + + msg << " (#{thread_report.join(', ')})" if thread_report.any? + + raise ExclusiveConnectionTimeoutError, msg + end + + def with_new_connections_blocked + previous_value = nil + synchronize do + previous_value, @new_cons_enabled = @new_cons_enabled, false + end + yield + ensure + synchronize { @new_cons_enabled = previous_value } + end # Acquire a connection by one of 1) immediately removing one # from the queue of available connections, 2) creating a new @@ -412,24 +689,31 @@ module ActiveRecord # # Raises: # - ConnectionTimeoutError if a connection could not be acquired - def acquire_connection - if conn = @available.poll + # + #-- + # Implementation detail: the connection returned by +acquire_connection+ + # will already be "+connection.lease+ -ed" to the current thread. + def acquire_connection(checkout_timeout) + # NOTE: we rely on `@available.poll` and `try_to_checkout_new_connection` to + # `conn.lease` the returned connection (and to do this in a `synchronized` + # section), this is not the cleanest implementation, as ideally we would + # `synchronize { conn.lease }` in this method, but by leaving it to `@available.poll` + # and `try_to_checkout_new_connection` we can piggyback on `synchronize` sections + # of the said methods and avoid an additional `synchronize` overhead. + if conn = @available.poll || try_to_checkout_new_connection conn - elsif @connections.size < @size - checkout_new_connection else reap - @available.poll(@checkout_timeout) + @available.poll(checkout_timeout) end end - def release(conn, owner) - thread_id = owner.object_id - - if @reserved_connections[thread_id] == conn - @reserved_connections.delete thread_id - end + #-- + # if owner_thread param is omitted, this must be called in synchronize block + def remove_connection_from_thread_cache(conn, owner_thread = conn.owner) + @thread_cached_conns.delete_pair(connection_cache_key(owner_thread), conn) end + alias_method :release, :remove_connection_from_thread_cache def new_connection Base.send(spec.adapter_method, spec.config).tap do |conn| @@ -437,17 +721,46 @@ module ActiveRecord end end - def current_connection_id #:nodoc: - Base.connection_id ||= Thread.current.object_id + # If the pool is not at a +@size+ limit, establish new connection. Connecting + # to the DB is done outside main synchronized section. + #-- + # Implementation constraint: a newly established connection returned by this + # method must be in the +.leased+ state. + def try_to_checkout_new_connection + # first in synchronized section check if establishing new conns is allowed + # and increment @now_connecting, to prevent overstepping this pool's @size + # constraint + do_checkout = synchronize do + if @new_cons_enabled && (@connections.size + @now_connecting) < @size + @now_connecting += 1 + end + end + if do_checkout + begin + # if successfully incremented @now_connecting establish new connection + # outside of synchronized section + conn = checkout_new_connection + ensure + synchronize do + if conn + adopt_connection(conn) + # returned conn needs to be already leased + conn.lease + end + @now_connecting -= 1 + end + end + end + end + + def adopt_connection(conn) + conn.pool = self + @connections << conn end def checkout_new_connection raise ConnectionNotEstablished unless @automatic_reconnect - - c = new_connection - c.pool = self - @connections << c - c + new_connection end def checkout_and_verify(c) @@ -620,7 +933,9 @@ module ActiveRecord # A connection was established in an ancestor process that must have # subsequently forked. We can't reuse the connection, but we can copy # the specification and establish a new connection with it. - establish_connection owner, ancestor_pool.spec + establish_connection(owner, ancestor_pool.spec).tap do |pool| + pool.schema_cache = ancestor_pool.schema_cache if ancestor_pool.schema_cache + end else owner_to_pool[owner.name] = nil end diff --git a/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb b/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb index 431fe25501..38dd9578fe 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb @@ -189,7 +189,7 @@ module ActiveRecord # You should consult the documentation for your database to understand the # semantics of these different levels: # - # * http://www.postgresql.org/docs/9.1/static/transaction-iso.html + # * http://www.postgresql.org/docs/current/static/transaction-iso.html # * https://dev.mysql.com/doc/refman/5.6/en/set-transaction.html # # An <tt>ActiveRecord::TransactionIsolationError</tt> will be raised if: diff --git a/activerecord/lib/active_record/connection_adapters/abstract/schema_definitions.rb b/activerecord/lib/active_record/connection_adapters/abstract/schema_definitions.rb index 0ccf0c498b..158b773e11 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/schema_definitions.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/schema_definitions.rb @@ -1,8 +1,3 @@ -require 'date' -require 'set' -require 'bigdecimal' -require 'bigdecimal/util' - module ActiveRecord module ConnectionAdapters #:nodoc: # Abstract representation of an index definition on a table. Instances of @@ -403,17 +398,12 @@ module ActiveRecord column(:updated_at, :datetime, options) end - # Adds a reference. Optionally adds a +type+ column, if the - # +:polymorphic+ option is provided. +references+ and +belongs_to+ - # are interchangeable. The reference column will be an +integer+ by default, - # the +:type+ option can be used to specify a different type. A foreign - # key will be created if the +:foreign_key+ option is passed. + # Adds a reference. # # t.references(:user) - # t.references(:user, type: "string") - # t.belongs_to(:supplier, polymorphic: true) + # t.belongs_to(:supplier, foreign_key: true) # - # See SchemaStatements#add_reference + # See SchemaStatements#add_reference for details of the options you can use. def references(*args, **options) args.each do |col| ReferenceDefinition.new(col, **options).add_to(self) @@ -647,15 +637,12 @@ module ActiveRecord @base.rename_column(name, column_name, new_column_name) end - # Adds a reference. Optionally adds a +type+ column, if - # <tt>:polymorphic</tt> option is provided. + # Adds a reference. # # t.references(:user) - # t.references(:user, type: "string") - # t.belongs_to(:supplier, polymorphic: true) # t.belongs_to(:supplier, foreign_key: true) # - # See SchemaStatements#add_reference + # See SchemaStatements#add_reference for details of the options you can use. def references(*args) options = args.extract_options! args.each do |ref_name| diff --git a/activerecord/lib/active_record/connection_adapters/abstract/schema_dumper.rb b/activerecord/lib/active_record/connection_adapters/abstract/schema_dumper.rb index deb014ad46..b944a8631c 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/schema_dumper.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/schema_dumper.rb @@ -27,10 +27,17 @@ module ActiveRecord spec[:type] = schema_type(column) spec[:null] = 'false' unless column.null - limit = column.limit || native_database_types[column.type][:limit] - spec[:limit] = limit.inspect if limit - spec[:precision] = column.precision.inspect if column.precision - spec[:scale] = column.scale.inspect if column.scale + if limit = schema_limit(column) + spec[:limit] = limit + end + + if precision = schema_precision(column) + spec[:precision] = precision + end + + if scale = schema_scale(column) + spec[:scale] = scale + end default = schema_default(column) if column.has_default? spec[:default] = default unless default.nil? @@ -53,6 +60,19 @@ module ActiveRecord column.type.to_s end + def schema_limit(column) + limit = column.limit || native_database_types[column.type][:limit] + limit.inspect if limit + end + + def schema_precision(column) + column.precision.inspect if column.precision + end + + def schema_scale(column) + column.scale.inspect if column.scale + end + def schema_default(column) type = lookup_cast_type_from_column(column) default = type.deserialize(column.default) diff --git a/activerecord/lib/active_record/connection_adapters/abstract/schema_statements.rb b/activerecord/lib/active_record/connection_adapters/abstract/schema_statements.rb index 9004d86b04..dd7dcb9cdd 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/schema_statements.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/schema_statements.rb @@ -649,11 +649,21 @@ module ActiveRecord indexes(table_name).detect { |i| i.name == index_name } end - # Adds a reference. Optionally adds a +type+ column, if <tt>:polymorphic</tt> option is provided. - # The reference column is an +integer+ by default, the <tt>:type</tt> option can be used to specify - # a different type. + # Adds a reference. The reference column is an integer by default, + # the <tt>:type</tt> option can be used to specify a different type. + # Optionally adds a +_type+ column, if <tt>:polymorphic</tt> option is provided. # <tt>add_reference</tt> and <tt>add_belongs_to</tt> are acceptable. # + # The +options+ hash can include the following keys: + # [<tt>:type</tt>] + # The reference column type. Defaults to +:integer+. + # [<tt>:index</tt>] + # Add an appropriate index. Defaults to false. + # [<tt>:foreign_key</tt>] + # Add an appropriate foreign key. Defaults to false. + # [<tt>:polymorphic</tt>] + # Whether an additional +_type+ column should be added. Defaults to false. + # # ====== Create a user_id integer column # # add_reference(:products, :user) @@ -662,10 +672,6 @@ module ActiveRecord # # add_reference(:products, :user, type: :string) # - # ====== Create a supplier_id and supplier_type columns - # - # add_belongs_to(:products, :supplier, polymorphic: true) - # # ====== Create supplier_id, supplier_type columns and appropriate index # # add_reference(:products, :supplier, polymorphic: true, index: true) diff --git a/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb b/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb index 0705c22a8c..6d3a21a3dc 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb @@ -1,13 +1,9 @@ -require 'date' -require 'bigdecimal' -require 'bigdecimal/util' require 'active_record/type' require 'active_support/core_ext/benchmark' require 'active_record/connection_adapters/schema_cache' require 'active_record/connection_adapters/sql_type_metadata' require 'active_record/connection_adapters/abstract/schema_dumper' require 'active_record/connection_adapters/abstract/schema_creation' -require 'monitor' require 'arel/collectors/bind' require 'arel/collectors/sql_string' @@ -70,7 +66,6 @@ module ActiveRecord include DatabaseLimits include QueryCache include ActiveSupport::Callbacks - include MonitorMixin include ColumnDumper SIMPLE_INT = /\A\d+\z/ @@ -141,12 +136,20 @@ module ActiveRecord SchemaCreation.new self end + # this method must only be called while holding connection pool's mutex def lease - synchronize do - unless in_use? - @owner = Thread.current + if in_use? + msg = 'Cannot lease connection, ' + if @owner == Thread.current + msg << 'it is already leased by the current thread.' + else + msg << "it is already in use by a different thread: #{@owner}. " << + "Current thread: #{Thread.current}." end + raise ActiveRecordError, msg end + + @owner = Thread.current end def schema_cache=(cache) @@ -154,6 +157,7 @@ module ActiveRecord @schema_cache = cache end + # this method must only be called while holding connection pool's mutex def expire @owner = nil end diff --git a/activerecord/lib/active_record/connection_adapters/abstract_mysql_adapter.rb b/activerecord/lib/active_record/connection_adapters/abstract_mysql_adapter.rb index 8c2b4ccac4..00e3d2965b 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract_mysql_adapter.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract_mysql_adapter.rb @@ -1,4 +1,3 @@ -require 'arel/visitors/bind_visitor' require 'active_support/core_ext/string/strip' module ActiveRecord @@ -122,11 +121,14 @@ module ActiveRecord spec end - def prepare_column_options(column) - spec = super - spec.delete(:precision) if /time/ === column.sql_type && column.precision == 0 - spec.delete(:limit) if :boolean === column.type - spec + private + + def schema_limit(column) + super unless column.type == :boolean + end + + def schema_precision(column) + super unless /time/ === column.sql_type && column.precision == 0 end def schema_collation(column) @@ -136,7 +138,8 @@ module ActiveRecord column.collation.inspect if column.collation != @collation_cache[table_name] end end - private :schema_collation + + public class Column < ConnectionAdapters::Column # :nodoc: delegate :strict, :extra, to: :sql_type_metadata, allow_nil: true @@ -697,29 +700,11 @@ module ActiveRecord def type_to_sql(type, limit = nil, precision = nil, scale = nil) case type.to_s when 'binary' - case limit - when 0..0xfff; "varbinary(#{limit})" - when nil; "blob" - when 0x1000..0xffffffff; "blob(#{limit})" - else raise(ActiveRecordError, "No binary type has character length #{limit}") - end + binary_to_sql(limit) when 'integer' - case limit - when 1; 'tinyint' - when 2; 'smallint' - when 3; 'mediumint' - when nil, 4, 11; 'int(11)' # compatibility with MySQL default - when 5..8; 'bigint' - else raise(ActiveRecordError, "No integer type has byte size #{limit}") - end + integer_to_sql(limit) when 'text' - case limit - when 0..0xff; 'tinytext' - when nil, 0x100..0xffff; 'text' - when 0x10000..0xffffff; 'mediumtext' - when 0x1000000..0xffffffff; 'longtext' - else raise(ActiveRecordError, "No text type has character length #{limit}") - end + text_to_sql(limit) else super end @@ -974,10 +959,12 @@ module ActiveRecord wait_timeout = 2147483 unless wait_timeout.is_a?(Fixnum) variables['wait_timeout'] = self.class.type_cast_config_to_integer(wait_timeout) + defaults = [':default', :default].to_set + # Make MySQL reject illegal values rather than truncating or blanking them, see # http://dev.mysql.com/doc/refman/5.6/en/sql-mode.html#sqlmode_strict_all_tables # If the user has provided another value for sql_mode, don't replace it. - unless variables.has_key?('sql_mode') + unless variables.has_key?('sql_mode') || defaults.include?(@config[:strict]) variables['sql_mode'] = strict_mode? ? 'STRICT_ALL_TABLES' : '' end @@ -992,7 +979,7 @@ module ActiveRecord # Gather up all of the SET variables... variable_assignments = variables.map do |k, v| - if v == ':default' || v == :default + if defaults.include?(v) "@@SESSION.#{k} = DEFAULT" # Sets the value to the global or compile default elsif !v.nil? "@@SESSION.#{k} = #{quote(v)}" @@ -1017,6 +1004,36 @@ module ActiveRecord TableDefinition.new(native_database_types, name, temporary, options, as) end + def binary_to_sql(limit) # :nodoc: + case limit + when 0..0xfff; "varbinary(#{limit})" + when nil; "blob" + when 0x1000..0xffffffff; "blob(#{limit})" + else raise(ActiveRecordError, "No binary type has character length #{limit}") + end + end + + def integer_to_sql(limit) # :nodoc: + case limit + when 1; 'tinyint' + when 2; 'smallint' + when 3; 'mediumint' + when nil, 4, 11; 'int(11)' # compatibility with MySQL default + when 5..8; 'bigint' + else raise(ActiveRecordError, "No integer type has byte size #{limit}") + end + end + + def text_to_sql(limit) # :nodoc: + case limit + when 0..0xff; 'tinytext' + when nil, 0x100..0xffff; 'text' + when 0x10000..0xffffff; 'mediumtext' + when 0x1000000..0xffffffff; 'longtext' + else raise(ActiveRecordError, "No text type has character length #{limit}") + end + end + class MysqlString < Type::String # :nodoc: def serialize(value) case value diff --git a/activerecord/lib/active_record/connection_adapters/mysql_adapter.rb b/activerecord/lib/active_record/connection_adapters/mysql_adapter.rb index 18febf66b4..2ae462d773 100644 --- a/activerecord/lib/active_record/connection_adapters/mysql_adapter.rb +++ b/activerecord/lib/active_record/connection_adapters/mysql_adapter.rb @@ -71,34 +71,10 @@ module ActiveRecord ADAPTER_NAME = 'MySQL'.freeze class StatementPool < ConnectionAdapters::StatementPool - def initialize(connection, max = 1000) - super - @cache = Hash.new { |h,pid| h[pid] = {} } - end - - def each(&block); cache.each(&block); end - def key?(key); cache.key?(key); end - def [](key); cache[key]; end - def length; cache.length; end - def delete(key); cache.delete(key); end - - def []=(sql, key) - while @max <= cache.size - cache.shift.last[:stmt].close - end - cache[sql] = key - end - - def clear - cache.each_value do |hash| - hash[:stmt].close - end - cache.clear - end - private - def cache - @cache[Process.pid] + + def dealloc(stmt) + stmt[:stmt].close end end @@ -416,8 +392,11 @@ module ActiveRecord # place when an error occurs. To support older MySQL versions, we # need to close the statement and delete the statement from the # cache. - stmt.close - @statements.delete sql + if binds.empty? + stmt.close + else + @statements.delete sql + end raise e end diff --git a/activerecord/lib/active_record/connection_adapters/postgresql/column.rb b/activerecord/lib/active_record/connection_adapters/postgresql/column.rb index be13ead120..bfa03fa136 100644 --- a/activerecord/lib/active_record/connection_adapters/postgresql/column.rb +++ b/activerecord/lib/active_record/connection_adapters/postgresql/column.rb @@ -8,7 +8,8 @@ module ActiveRecord def serial? return unless default_function - %r{\Anextval\('(?<table_name>.+)_#{name}_seq'::regclass\)\z} === default_function + table_name = @table_name || '(?<table_name>.+)' + %r{\Anextval\('"?#{table_name}_#{name}_seq"?'::regclass\)\z} === default_function end end end diff --git a/activerecord/lib/active_record/connection_adapters/postgresql/oid.rb b/activerecord/lib/active_record/connection_adapters/postgresql/oid.rb index 92349e2f9b..68752cdd80 100644 --- a/activerecord/lib/active_record/connection_adapters/postgresql/oid.rb +++ b/activerecord/lib/active_record/connection_adapters/postgresql/oid.rb @@ -12,6 +12,7 @@ require 'active_record/connection_adapters/postgresql/oid/json' require 'active_record/connection_adapters/postgresql/oid/jsonb' require 'active_record/connection_adapters/postgresql/oid/money' require 'active_record/connection_adapters/postgresql/oid/point' +require 'active_record/connection_adapters/postgresql/oid/rails_5_1_point' require 'active_record/connection_adapters/postgresql/oid/range' require 'active_record/connection_adapters/postgresql/oid/specialized_string' require 'active_record/connection_adapters/postgresql/oid/uuid' diff --git a/activerecord/lib/active_record/connection_adapters/postgresql/oid/array.rb b/activerecord/lib/active_record/connection_adapters/postgresql/oid/array.rb index 3de794f797..25961a9869 100644 --- a/activerecord/lib/active_record/connection_adapters/postgresql/oid/array.rb +++ b/activerecord/lib/active_record/connection_adapters/postgresql/oid/array.rb @@ -45,6 +45,11 @@ module ActiveRecord delimiter == other.delimiter end + def type_cast_for_schema(value) + return super unless value.is_a?(::Array) + "[" + value.map { |v| subtype.type_cast_for_schema(v) }.join(", ") + "]" + end + private def type_cast_array(value, method) diff --git a/activerecord/lib/active_record/connection_adapters/postgresql/oid/jsonb.rb b/activerecord/lib/active_record/connection_adapters/postgresql/oid/jsonb.rb index afc9383f91..87391b5dc7 100644 --- a/activerecord/lib/active_record/connection_adapters/postgresql/oid/jsonb.rb +++ b/activerecord/lib/active_record/connection_adapters/postgresql/oid/jsonb.rb @@ -9,7 +9,7 @@ module ActiveRecord def changed_in_place?(raw_old_value, new_value) # Postgres does not preserve insignificant whitespaces when - # roundtripping jsonb columns. This causes some false positives for + # round-tripping jsonb columns. This causes some false positives for # the comparison here. Therefore, we need to parse and re-dump the # raw value here to ensure the insignificant whitespaces are # consistent with our encoder's output. diff --git a/activerecord/lib/active_record/connection_adapters/postgresql/oid/rails_5_1_point.rb b/activerecord/lib/active_record/connection_adapters/postgresql/oid/rails_5_1_point.rb new file mode 100644 index 0000000000..7427a25ad5 --- /dev/null +++ b/activerecord/lib/active_record/connection_adapters/postgresql/oid/rails_5_1_point.rb @@ -0,0 +1,50 @@ +module ActiveRecord + Point = Struct.new(:x, :y) + + module ConnectionAdapters + module PostgreSQL + module OID # :nodoc: + class Rails51Point < Type::Value # :nodoc: + include Type::Helpers::Mutable + + def type + :point + end + + def cast(value) + case value + when ::String + if value[0] == '(' && value[-1] == ')' + value = value[1...-1] + end + x, y = value.split(",") + build_point(x, y) + when ::Array + build_point(*value) + else + value + end + end + + def serialize(value) + if value.is_a?(ActiveRecord::Point) + "(#{number_for_point(value.x)},#{number_for_point(value.y)})" + else + super + end + end + + private + + def number_for_point(number) + number.to_s.gsub(/\.0$/, '') + end + + def build_point(x, y) + ActiveRecord::Point.new(Float(x), Float(y)) + end + end + end + end + end +end diff --git a/activerecord/lib/active_record/connection_adapters/postgresql/schema_statements.rb b/activerecord/lib/active_record/connection_adapters/postgresql/schema_statements.rb index 686671b007..595c635fc0 100644 --- a/activerecord/lib/active_record/connection_adapters/postgresql/schema_statements.rb +++ b/activerecord/lib/active_record/connection_adapters/postgresql/schema_statements.rb @@ -70,11 +70,7 @@ module ActiveRecord # Returns the list of all tables in the schema search path or a specified schema. def tables(name = nil) - select_values(<<-SQL, 'SCHEMA') - SELECT tablename - FROM pg_tables - WHERE schemaname = ANY (current_schemas(false)) - SQL + select_values("SELECT tablename FROM pg_tables WHERE schemaname = ANY(current_schemas(false))", 'SCHEMA') end # Returns true if table exists. @@ -100,11 +96,7 @@ module ActiveRecord # Returns true if schema exists. def schema_exists?(name) - select_value(<<-SQL, 'SCHEMA').to_i > 0 - SELECT COUNT(*) - FROM pg_namespace - WHERE nspname = '#{name}' - SQL + select_value("SELECT COUNT(*) FROM pg_namespace WHERE nspname = '#{name}'", 'SCHEMA').to_i > 0 end # Verifies existence of an index with a given name. @@ -192,24 +184,17 @@ module ActiveRecord # Returns the current database encoding format. def encoding - select_value(<<-end_sql, 'SCHEMA') - SELECT pg_encoding_to_char(pg_database.encoding) FROM pg_database - WHERE pg_database.datname LIKE '#{current_database}' - end_sql + select_value("SELECT pg_encoding_to_char(encoding) FROM pg_database WHERE datname LIKE '#{current_database}'", 'SCHEMA') end # Returns the current database collation. def collation - select_value(<<-end_sql, 'SCHEMA') - SELECT pg_database.datcollate FROM pg_database WHERE pg_database.datname LIKE '#{current_database}' - end_sql + select_value("SELECT datcollate FROM pg_database WHERE datname LIKE '#{current_database}'", 'SCHEMA') end # Returns the current database ctype. def ctype - select_value(<<-end_sql, 'SCHEMA') - SELECT pg_database.datctype FROM pg_database WHERE pg_database.datname LIKE '#{current_database}' - end_sql + select_value("SELECT datctype FROM pg_database WHERE datname LIKE '#{current_database}'", 'SCHEMA') end # Returns an array of schema names. @@ -398,9 +383,7 @@ module ActiveRecord rename_table_indexes(table_name, new_name) end - # Adds a new column to the named table. - # See TableDefinition#column for details of the options you can use. - def add_column(table_name, column_name, type, options = {}) + def add_column(table_name, column_name, type, options = {}) #:nodoc: clear_cache! super end @@ -452,7 +435,7 @@ module ActiveRecord end # Renames a column in a table. - def rename_column(table_name, column_name, new_column_name) + def rename_column(table_name, column_name, new_column_name) #:nodoc: clear_cache! execute "ALTER TABLE #{quote_table_name(table_name)} RENAME COLUMN #{quote_column_name(column_name)} TO #{quote_column_name(new_column_name)}" rename_column_indexes(table_name, column_name, new_column_name) diff --git a/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb b/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb index 2b33a5b9cb..2c43c46a3d 100644 --- a/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb +++ b/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb @@ -14,8 +14,6 @@ require "active_record/connection_adapters/postgresql/type_metadata" require "active_record/connection_adapters/postgresql/utils" require "active_record/connection_adapters/statement_pool" -require 'arel/visitors/bind_visitor' - require 'ipaddr' module ActiveRecord @@ -68,11 +66,11 @@ module ActiveRecord # defaults to true. # # Any further options are used as connection parameters to libpq. See - # http://www.postgresql.org/docs/9.1/static/libpq-connect.html for the + # http://www.postgresql.org/docs/current/static/libpq-connect.html for the # list of parameters. # # In addition, default connection parameters of libpq can be set per environment variables. - # See http://www.postgresql.org/docs/9.1/static/libpq-envars.html . + # See http://www.postgresql.org/docs/current/static/libpq-envars.html . class PostgreSQLAdapter < AbstractAdapter ADAPTER_NAME = 'PostgreSQL'.freeze @@ -211,44 +209,18 @@ module ActiveRecord def initialize(connection, max) super @counter = 0 - @cache = Hash.new { |h,pid| h[pid] = {} } end - def each(&block); cache.each(&block); end - def key?(key); cache.key?(key); end - def [](key); cache[key]; end - def length; cache.length; end - def next_key "a#{@counter + 1}" end def []=(sql, key) - while @max <= cache.size - dealloc(cache.shift.last) - end - @counter += 1 - cache[sql] = key - end - - def clear - cache.each_value do |stmt_key| - dealloc stmt_key - end - cache.clear - end - - def delete(sql_key) - dealloc cache[sql_key] - cache.delete sql_key + super.tap { @counter += 1 } end private - def cache - @cache[Process.pid] - end - def dealloc(key) @connection.query "DEALLOCATE #{key}" if connection_active? end @@ -452,7 +424,7 @@ module ActiveRecord @connection.server_version end - # See http://www.postgresql.org/docs/9.1/static/errcodes-appendix.html + # See http://www.postgresql.org/docs/current/static/errcodes-appendix.html FOREIGN_KEY_VIOLATION = "23503" UNIQUE_VIOLATION = "23505" @@ -726,7 +698,7 @@ module ActiveRecord end # SET statements from :variables config hash - # http://www.postgresql.org/docs/8.3/static/sql-set.html + # http://www.postgresql.org/docs/current/static/sql-set.html variables = @config[:variables] || {} variables.map do |k, v| if v == ':default' || v == :default @@ -866,6 +838,8 @@ module ActiveRecord ActiveRecord::Type.register(:jsonb, OID::Jsonb, adapter: :postgresql) ActiveRecord::Type.register(:money, OID::Money, adapter: :postgresql) ActiveRecord::Type.register(:point, OID::Point, adapter: :postgresql) + ActiveRecord::Type.register(:legacy_point, OID::Point, adapter: :postgresql) + ActiveRecord::Type.register(:rails_5_1_point, OID::Rails51Point, adapter: :postgresql) ActiveRecord::Type.register(:uuid, OID::Uuid, adapter: :postgresql) ActiveRecord::Type.register(:vector, OID::Vector, adapter: :postgresql) ActiveRecord::Type.register(:xml, OID::Xml, adapter: :postgresql) diff --git a/activerecord/lib/active_record/connection_adapters/sqlite3/schema_creation.rb b/activerecord/lib/active_record/connection_adapters/sqlite3/schema_creation.rb new file mode 100644 index 0000000000..fe1dcbd710 --- /dev/null +++ b/activerecord/lib/active_record/connection_adapters/sqlite3/schema_creation.rb @@ -0,0 +1,15 @@ +module ActiveRecord + module ConnectionAdapters + module SQLite3 + class SchemaCreation < AbstractAdapter::SchemaCreation + private + def add_column_options!(sql, options) + if options[:collation] + sql << " COLLATE \"#{options[:collation]}\"" + end + super + end + end + end + end +end diff --git a/activerecord/lib/active_record/connection_adapters/sqlite3_adapter.rb b/activerecord/lib/active_record/connection_adapters/sqlite3_adapter.rb index 3186769510..87129c42cf 100644 --- a/activerecord/lib/active_record/connection_adapters/sqlite3_adapter.rb +++ b/activerecord/lib/active_record/connection_adapters/sqlite3_adapter.rb @@ -1,6 +1,6 @@ require 'active_record/connection_adapters/abstract_adapter' require 'active_record/connection_adapters/statement_pool' -require 'arel/visitors/bind_visitor' +require 'active_record/connection_adapters/sqlite3/schema_creation' gem 'sqlite3', '~> 1.3.6' require 'sqlite3' @@ -78,40 +78,17 @@ module ActiveRecord end class StatementPool < ConnectionAdapters::StatementPool - def initialize(connection, max) - super - @cache = Hash.new { |h,pid| h[pid] = {} } - end - - def each(&block); cache.each(&block); end - def key?(key); cache.key?(key); end - def [](key); cache[key]; end - def length; cache.length; end - - def []=(sql, key) - while @max <= cache.size - dealloc(cache.shift.last[:stmt]) - end - cache[sql] = key - end - - def clear - cache.each_value do |hash| - dealloc hash[:stmt] - end - cache.clear - end - private - def cache - @cache[$$] - end def dealloc(stmt) - stmt.close unless stmt.closed? + stmt[:stmt].close unless stmt[:stmt].closed? end end + def schema_creation # :nodoc: + SQLite3::SchemaCreation.new self + end + def initialize(connection, logger, connection_options, config) super(connection, logger) @@ -372,9 +349,10 @@ module ActiveRecord field["dflt_value"] = $1.gsub('""', '"') end + collation = field['collation'] sql_type = field['type'] type_metadata = fetch_type_metadata(sql_type) - new_column(field['name'], field['dflt_value'], type_metadata, field['notnull'].to_i == 0) + new_column(field['name'], field['dflt_value'], type_metadata, field['notnull'].to_i == 0, nil, collation) end end @@ -469,6 +447,7 @@ module ActiveRecord self.null = options[:null] if options.include?(:null) self.precision = options[:precision] if options.include?(:precision) self.scale = options[:scale] if options.include?(:scale) + self.collation = options[:collation] if options.include?(:collation) end end end @@ -482,9 +461,9 @@ module ActiveRecord protected def table_structure(table_name) - structure = exec_query("PRAGMA table_info(#{quote_table_name(table_name)})", 'SCHEMA').to_hash + structure = exec_query("PRAGMA table_info(#{quote_table_name(table_name)})", 'SCHEMA') raise(ActiveRecord::StatementInvalid, "Could not find table '#{table_name}'") if structure.empty? - structure + table_structure_with_collation(table_name, structure) end def alter_table(table_name, options = {}) #:nodoc: @@ -519,7 +498,7 @@ module ActiveRecord @definition.column(column_name, column.type, :limit => column.limit, :default => column.default, :precision => column.precision, :scale => column.scale, - :null => column.null) + :null => column.null, collation: column.collation) end yield @definition if block_given? end @@ -581,6 +560,46 @@ module ActiveRecord super end end + + private + COLLATE_REGEX = /.*\"(\w+)\".*collate\s+\"(\w+)\".*/i.freeze + + def table_structure_with_collation(table_name, basic_structure) + collation_hash = {} + sql = "SELECT sql FROM + (SELECT * FROM sqlite_master UNION ALL + SELECT * FROM sqlite_temp_master) + WHERE type='table' and name='#{ table_name }' \;" + + # Result will have following sample string + # CREATE TABLE "users" ("id" INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, + # "password_digest" varchar COLLATE "NOCASE"); + result = exec_query(sql, 'SCHEMA').first + + if result + # Splitting with left parantheses and picking up last will return all + # columns separated with comma(,). + columns_string = result["sql"].split('(').last + + columns_string.split(',').each do |column_string| + # This regex will match the column name and collation type and will save + # the value in $1 and $2 respectively. + collation_hash[$1] = $2 if (COLLATE_REGEX =~ column_string) + end + + basic_structure.map! do |column| + column_name = column['name'] + + if collation_hash.has_key? column_name + column['collation'] = collation_hash[column_name] + end + + column + end + else + basic_structure.to_hash + end + end end end end diff --git a/activerecord/lib/active_record/connection_adapters/statement_pool.rb b/activerecord/lib/active_record/connection_adapters/statement_pool.rb index c6b1bc8b5b..82e9ef3d3d 100644 --- a/activerecord/lib/active_record/connection_adapters/statement_pool.rb +++ b/activerecord/lib/active_record/connection_adapters/statement_pool.rb @@ -4,35 +4,53 @@ module ActiveRecord include Enumerable def initialize(connection, max = 1000) + @cache = Hash.new { |h,pid| h[pid] = {} } @connection = connection @max = max end - def each - raise NotImplementedError + def each(&block) + cache.each(&block) end def key?(key) - raise NotImplementedError + cache.key?(key) end def [](key) - raise NotImplementedError + cache[key] end def length - raise NotImplementedError + cache.length end - def []=(sql, key) - raise NotImplementedError + def []=(sql, stmt) + while @max <= cache.size + dealloc(cache.shift.last) + end + cache[sql] = stmt end def clear - raise NotImplementedError + cache.each_value do |stmt| + dealloc stmt + end + cache.clear end def delete(key) + dealloc cache[key] + cache.delete(key) + end + + private + + def cache + @cache[Process.pid] + end + + def dealloc(stmt) raise NotImplementedError end end |