aboutsummaryrefslogtreecommitdiffstats
path: root/activerecord
diff options
context:
space:
mode:
Diffstat (limited to 'activerecord')
-rw-r--r--activerecord/lib/active_record/connection_adapters/abstract_adapter.rb19
-rw-r--r--activerecord/lib/active_record/connection_adapters/abstract_mysql_adapter.rb14
-rw-r--r--activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb18
-rw-r--r--activerecord/lib/active_record/migration.rb95
-rw-r--r--activerecord/test/cases/adapters/mysql/connection_test.rb28
-rw-r--r--activerecord/test/cases/adapters/mysql2/connection_test.rb28
-rw-r--r--activerecord/test/cases/adapters/postgresql/connection_test.rb42
-rw-r--r--activerecord/test/cases/migration_test.rb101
8 files changed, 322 insertions, 23 deletions
diff --git a/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb b/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb
index 402159ac13..f5b2e9fa9d 100644
--- a/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb
+++ b/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb
@@ -214,6 +214,11 @@ module ActiveRecord
false
end
+ # Does this adapter support application-enforced advisory locking?
+ def supports_advisory_locks?
+ false
+ end
+
# Should primary key values be selected from their corresponding
# sequence before the insert statement? If true, next_sequence_value
# is called before each insert to set the record's primary key.
@@ -280,6 +285,20 @@ module ActiveRecord
def enable_extension(name)
end
+ # This is meant to be implemented by the adapters that support advisory
+ # locks
+ #
+ # Return true if we got the lock, otherwise false
+ def get_advisory_lock(key) # :nodoc:
+ end
+
+ # This is meant to be implemented by the adapters that support advisory
+ # locks.
+ #
+ # Return true if we released the lock, otherwise false
+ def release_advisory_lock(key) # :nodoc:
+ end
+
# A list of extensions, to be filled in by adapters that support them.
def extensions
[]
diff --git a/activerecord/lib/active_record/connection_adapters/abstract_mysql_adapter.rb b/activerecord/lib/active_record/connection_adapters/abstract_mysql_adapter.rb
index 251acf1c83..b775c18c1b 100644
--- a/activerecord/lib/active_record/connection_adapters/abstract_mysql_adapter.rb
+++ b/activerecord/lib/active_record/connection_adapters/abstract_mysql_adapter.rb
@@ -220,6 +220,20 @@ module ActiveRecord
version >= '5.6.4'
end
+ # 5.0.0 definitely supports it, possibly supported by earlier versions but
+ # not sure
+ def supports_advisory_locks?
+ version >= '5.0.0'
+ end
+
+ def get_advisory_lock(key, timeout = 0) # :nodoc:
+ select_value("SELECT GET_LOCK('#{key}', #{timeout});").to_s == '1'
+ end
+
+ def release_advisory_lock(key) # :nodoc:
+ select_value("SELECT RELEASE_LOCK('#{key}')").to_s == '1'
+ end
+
def native_database_types
NATIVE_DATABASE_TYPES
end
diff --git a/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb b/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb
index 236c067fd5..0af7d99a4b 100644
--- a/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb
+++ b/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb
@@ -293,6 +293,10 @@ module ActiveRecord
true
end
+ def supports_advisory_locks?
+ true
+ end
+
def supports_explain?
true
end
@@ -311,6 +315,20 @@ module ActiveRecord
postgresql_version >= 90300
end
+ def get_advisory_lock(key) # :nodoc:
+ unless key.is_a?(Integer) && key.bit_length <= 63
+ raise(ArgumentError, "Postgres requires advisory lock keys to be a signed 64 bit integer")
+ end
+ select_value("SELECT pg_try_advisory_lock(#{key});")
+ end
+
+ def release_advisory_lock(key) # :nodoc:
+ unless key.is_a?(Integer) && key.bit_length <= 63
+ raise(ArgumentError, "Postgres requires advisory lock keys to be a signed 64 bit integer")
+ end
+ select_value("SELECT pg_advisory_unlock(#{key})")
+ end
+
def enable_extension(name)
exec_query("CREATE EXTENSION IF NOT EXISTS \"#{name}\"").tap {
reload_type_map
diff --git a/activerecord/lib/active_record/migration.rb b/activerecord/lib/active_record/migration.rb
index c8b96b8de0..63ec8f6745 100644
--- a/activerecord/lib/active_record/migration.rb
+++ b/activerecord/lib/active_record/migration.rb
@@ -135,6 +135,14 @@ module ActiveRecord
end
end
+ class ConcurrentMigrationError < MigrationError #:nodoc:
+ DEFAULT_MESSAGE = "Cannot run migrations because another migration process is currently running.".freeze
+
+ def initialize(message = DEFAULT_MESSAGE)
+ super
+ end
+ end
+
# = Active Record Migrations
#
# Migrations can manage the evolution of a schema used by several physical
@@ -1042,32 +1050,18 @@ module ActiveRecord
alias :current :current_migration
def run
- migration = migrations.detect { |m| m.version == @target_version }
- raise UnknownMigrationVersionError.new(@target_version) if migration.nil?
- unless (up? && migrated.include?(migration.version.to_i)) || (down? && !migrated.include?(migration.version.to_i))
- begin
- execute_migration_in_transaction(migration, @direction)
- rescue => e
- canceled_msg = use_transaction?(migration) ? ", this migration was canceled" : ""
- raise StandardError, "An error has occurred#{canceled_msg}:\n\n#{e}", e.backtrace
- end
+ if use_advisory_lock?
+ with_advisory_lock { run_without_lock }
+ else
+ run_without_lock
end
end
def migrate
- if !target && @target_version && @target_version > 0
- raise UnknownMigrationVersionError.new(@target_version)
- end
-
- runnable.each do |migration|
- Base.logger.info "Migrating to #{migration.name} (#{migration.version})" if Base.logger
-
- begin
- execute_migration_in_transaction(migration, @direction)
- rescue => e
- canceled_msg = use_transaction?(migration) ? "this and " : ""
- raise StandardError, "An error has occurred, #{canceled_msg}all later migrations canceled:\n\n#{e}", e.backtrace
- end
+ if use_advisory_lock?
+ with_advisory_lock { migrate_without_lock }
+ else
+ migrate_without_lock
end
end
@@ -1092,10 +1086,45 @@ module ActiveRecord
end
def migrated
- @migrated_versions ||= Set.new(self.class.get_all_versions)
+ @migrated_versions || load_migrated
+ end
+
+ def load_migrated
+ @migrated_versions = Set.new(self.class.get_all_versions)
end
private
+
+ def run_without_lock
+ migration = migrations.detect { |m| m.version == @target_version }
+ raise UnknownMigrationVersionError.new(@target_version) if migration.nil?
+ unless (up? && migrated.include?(migration.version.to_i)) || (down? && !migrated.include?(migration.version.to_i))
+ begin
+ execute_migration_in_transaction(migration, @direction)
+ rescue => e
+ canceled_msg = use_transaction?(migration) ? ", this migration was canceled" : ""
+ raise StandardError, "An error has occurred#{canceled_msg}:\n\n#{e}", e.backtrace
+ end
+ end
+ end
+
+ def migrate_without_lock
+ if !target && @target_version && @target_version > 0
+ raise UnknownMigrationVersionError.new(@target_version)
+ end
+
+ runnable.each do |migration|
+ Base.logger.info "Migrating to #{migration.name} (#{migration.version})" if Base.logger
+
+ begin
+ execute_migration_in_transaction(migration, @direction)
+ rescue => e
+ canceled_msg = use_transaction?(migration) ? "this and " : ""
+ raise StandardError, "An error has occurred, #{canceled_msg}all later migrations canceled:\n\n#{e}", e.backtrace
+ end
+ end
+ end
+
def ran?(migration)
migrated.include?(migration.version.to_i)
end
@@ -1157,5 +1186,25 @@ module ActiveRecord
def use_transaction?(migration)
!migration.disable_ddl_transaction && Base.connection.supports_ddl_transactions?
end
+
+ def use_advisory_lock?
+ Base.connection.supports_advisory_locks?
+ end
+
+ def with_advisory_lock
+ key = generate_migrator_advisory_lock_key
+ got_lock = Base.connection.get_advisory_lock(key)
+ raise ConcurrentMigrationError unless got_lock
+ load_migrated # reload schema_migrations to be sure it wasn't changed by another process before we got the lock
+ yield
+ ensure
+ Base.connection.release_advisory_lock(key) if got_lock
+ end
+
+ MIGRATOR_SALT = 2053462845
+ def generate_migrator_advisory_lock_key
+ db_name_hash = Zlib.crc32(Base.connection.current_database)
+ MIGRATOR_SALT * db_name_hash
+ end
end
end
diff --git a/activerecord/test/cases/adapters/mysql/connection_test.rb b/activerecord/test/cases/adapters/mysql/connection_test.rb
index decac9e83b..75653ee9af 100644
--- a/activerecord/test/cases/adapters/mysql/connection_test.rb
+++ b/activerecord/test/cases/adapters/mysql/connection_test.rb
@@ -170,6 +170,34 @@ class MysqlConnectionTest < ActiveRecord::MysqlTestCase
end
end
+ def test_get_and_release_advisory_lock
+ key = "test_key"
+
+ got_lock = @connection.get_advisory_lock(key)
+ assert got_lock, "get_advisory_lock should have returned true but it didn't"
+
+ assert_equal test_lock_free(key), false,
+ "expected the test advisory lock to be held but it wasn't"
+
+ released_lock = @connection.release_advisory_lock(key)
+ assert released_lock, "expected release_advisory_lock to return true but it didn't"
+
+ assert test_lock_free(key), 'expected the test key to be available after releasing'
+ end
+
+ def test_release_non_existent_advisory_lock
+ fake_key = "fake_key"
+ released_non_existent_lock = @connection.release_advisory_lock(fake_key)
+ assert_equal released_non_existent_lock, false,
+ 'expected release_advisory_lock to return false when there was no lock to release'
+ end
+
+ protected
+
+ def test_lock_free(key)
+ @connection.select_value("SELECT IS_FREE_LOCK('#{key}');") == '1'
+ end
+
private
def with_example_table(&block)
diff --git a/activerecord/test/cases/adapters/mysql2/connection_test.rb b/activerecord/test/cases/adapters/mysql2/connection_test.rb
index 000bcadebe..71c4028675 100644
--- a/activerecord/test/cases/adapters/mysql2/connection_test.rb
+++ b/activerecord/test/cases/adapters/mysql2/connection_test.rb
@@ -131,4 +131,32 @@ class Mysql2ConnectionTest < ActiveRecord::Mysql2TestCase
ensure
@connection.execute "DROP TABLE `bar_baz`"
end
+
+ def test_get_and_release_advisory_lock
+ key = "test_key"
+
+ got_lock = @connection.get_advisory_lock(key)
+ assert got_lock, "get_advisory_lock should have returned true but it didn't"
+
+ assert_equal test_lock_free(key), false,
+ "expected the test advisory lock to be held but it wasn't"
+
+ released_lock = @connection.release_advisory_lock(key)
+ assert released_lock, "expected release_advisory_lock to return true but it didn't"
+
+ assert test_lock_free(key), 'expected the test key to be available after releasing'
+ end
+
+ def test_release_non_existent_advisory_lock
+ fake_key = "fake_key"
+ released_non_existent_lock = @connection.release_advisory_lock(fake_key)
+ assert_equal released_non_existent_lock, false,
+ 'expected release_advisory_lock to return false when there was no lock to release'
+ end
+
+ protected
+
+ def test_lock_free(key)
+ @connection.select_value("SELECT IS_FREE_LOCK('#{key}');") == 1
+ end
end
diff --git a/activerecord/test/cases/adapters/postgresql/connection_test.rb b/activerecord/test/cases/adapters/postgresql/connection_test.rb
index 722e2377c1..b12beb91de 100644
--- a/activerecord/test/cases/adapters/postgresql/connection_test.rb
+++ b/activerecord/test/cases/adapters/postgresql/connection_test.rb
@@ -209,5 +209,47 @@ module ActiveRecord
ActiveRecord::Base.establish_connection(orig_connection.deep_merge({:variables => {:debug_print_plan => :default}}))
end
end
+
+ def test_get_and_release_advisory_lock
+ key = 5295901941911233559
+ list_advisory_locks = <<-SQL
+ SELECT locktype,
+ (classid::bigint << 32) | objid::bigint AS lock_key
+ FROM pg_locks
+ WHERE locktype = 'advisory'
+ SQL
+
+ got_lock = @connection.get_advisory_lock(key)
+ assert got_lock, "get_advisory_lock should have returned true but it didn't"
+
+ advisory_lock = @connection.query(list_advisory_locks).find {|l| l[1] == key}
+ assert advisory_lock,
+ "expected to find an advisory lock with key #{key} but there wasn't one"
+
+ released_lock = @connection.release_advisory_lock(key)
+ assert released_lock, "expected release_advisory_lock to return true but it didn't"
+
+ advisory_locks = @connection.query(list_advisory_locks).select {|l| l[1] == key}
+ assert_empty advisory_locks,
+ "expected to have released advisory lock with key #{key} but it was still held"
+ end
+
+ def test_release_non_existent_advisory_lock
+ fake_key = 2940075057017742022
+ with_warning_suppression do
+ released_non_existent_lock = @connection.release_advisory_lock(fake_key)
+ assert_equal released_non_existent_lock, false,
+ 'expected release_advisory_lock to return false when there was no lock to release'
+ end
+ end
+
+ protected
+
+ def with_warning_suppression
+ log_level = @connection.client_min_messages
+ @connection.client_min_messages = 'error'
+ yield
+ @connection.client_min_messages = log_level
+ end
end
end
diff --git a/activerecord/test/cases/migration_test.rb b/activerecord/test/cases/migration_test.rb
index 10f1c7216f..9c4fa0df4f 100644
--- a/activerecord/test/cases/migration_test.rb
+++ b/activerecord/test/cases/migration_test.rb
@@ -522,6 +522,79 @@ class MigrationTest < ActiveRecord::TestCase
end
end
+ if ActiveRecord::Base.connection.supports_advisory_locks?
+ def test_migrator_generates_valid_lock_key
+ migration = Class.new(ActiveRecord::Migration).new
+ migrator = ActiveRecord::Migrator.new(:up, [migration], 100)
+
+ lock_key = migrator.send(:generate_migrator_advisory_lock_key)
+
+ assert ActiveRecord::Base.connection.get_advisory_lock(lock_key),
+ "the Migrator should have generated a valid lock key, but it didn't"
+ assert ActiveRecord::Base.connection.release_advisory_lock(lock_key),
+ "the Migrator should have generated a valid lock key, but it didn't"
+ end
+
+ def test_generate_migrator_advisory_lock_key
+ # It is important we are consistent with how we generate this so that
+ # exclusive locking works across migrator versions
+ migration = Class.new(ActiveRecord::Migration).new
+ migrator = ActiveRecord::Migrator.new(:up, [migration], 100)
+
+ lock_key = migrator.send(:generate_migrator_advisory_lock_key)
+
+ current_database = ActiveRecord::Base.connection.current_database
+ salt = ActiveRecord::Migrator::MIGRATOR_SALT
+ expected_key = Zlib.crc32(current_database) * salt
+
+ assert lock_key == expected_key, "expected lock key generated by the migrator to be #{expected_key}, but it was #{lock_key} instead"
+ assert lock_key.is_a?(Fixnum), "expected lock key to be a Fixnum, but it wasn't"
+ assert lock_key.bit_length <= 63, "lock key must be a signed integer of max 63 bits magnitude"
+ end
+
+ def test_migrator_one_up_with_unavailable_lock
+ assert_no_column Person, :last_name
+
+ migration = Class.new(ActiveRecord::Migration) {
+ def version; 100 end
+ def migrate(x)
+ add_column "people", "last_name", :string
+ end
+ }.new
+
+ migrator = ActiveRecord::Migrator.new(:up, [migration], 100)
+ lock_key = migrator.send(:generate_migrator_advisory_lock_key)
+
+ with_another_process_holding_lock(lock_key) do
+ assert_raise(ActiveRecord::ConcurrentMigrationError) { migrator.migrate }
+ end
+
+ assert_no_column Person, :last_name,
+ "without an advisory lock, the Migrator should not make any changes, but it did."
+ end
+
+ def test_migrator_one_up_with_unavailable_lock_using_run
+ assert_no_column Person, :last_name
+
+ migration = Class.new(ActiveRecord::Migration) {
+ def version; 100 end
+ def migrate(x)
+ add_column "people", "last_name", :string
+ end
+ }.new
+
+ migrator = ActiveRecord::Migrator.new(:up, [migration], 100)
+ lock_key = migrator.send(:generate_migrator_advisory_lock_key)
+
+ with_another_process_holding_lock(lock_key) do
+ assert_raise(ActiveRecord::ConcurrentMigrationError) { migrator.run }
+ end
+
+ assert_no_column Person, :last_name,
+ "without an advisory lock, the Migrator should not make any changes, but it did."
+ end
+ end
+
protected
# This is needed to isolate class_attribute assignments like `table_name_prefix`
# for each test case.
@@ -531,6 +604,34 @@ class MigrationTest < ActiveRecord::TestCase
def self.base_class; self; end
}
end
+
+ def with_another_process_holding_lock(lock_key)
+ other_process_has_lock = false
+ test_terminated = false
+
+ other_process = Thread.new do
+ begin
+ conn = ActiveRecord::Base.connection_pool.checkout
+ conn.get_advisory_lock(lock_key)
+ other_process_has_lock = true
+ while !test_terminated do # hold the lock open until we tested everything
+ sleep(0.01)
+ end
+ ensure
+ conn.release_advisory_lock(lock_key)
+ ActiveRecord::Base.connection_pool.checkin(conn)
+ end
+ end
+
+ while !other_process_has_lock # wait until the 'other process' has the lock
+ sleep(0.01)
+ end
+
+ yield
+
+ test_terminated = true
+ other_process.join
+ end
end
class ReservedWordsMigrationTest < ActiveRecord::TestCase