diff options
author | Sam Davies <seivadmas@gmail.com> | 2015-10-29 18:26:54 -0300 |
---|---|---|
committer | Sam Davies <seivadmas@gmail.com> | 2015-10-30 14:04:16 -0300 |
commit | 2c2a8755460ec3d32ece91c9766dbd0304ece028 (patch) | |
tree | 350a241e5ff5e07ad395455fe3d567dea70ede0f /activerecord/lib | |
parent | 0174837bfa263c55cf727f23028d8f28da192d14 (diff) | |
download | rails-2c2a8755460ec3d32ece91c9766dbd0304ece028.tar.gz rails-2c2a8755460ec3d32ece91c9766dbd0304ece028.tar.bz2 rails-2c2a8755460ec3d32ece91c9766dbd0304ece028.zip |
Use advisory locks to prevent concurrent migrations
- Addresses issue #22092
- Works on Postgres and MySQL
- Uses advisory locks because of two important properties:
1. The can be obtained outside of the context of a transaction
2. They are automatically released when the session ends, so if a
migration process crashed for whatever reason the lock is not left
open perpetually
- Adds get_advisory_lock and release_advisory_lock methods to database
adapters
- Attempting to run a migration while another one is in process will
raise a ConcurrentMigrationError instead of attempting to run in
parallel with undefined behavior. This could be rescued and
the migration could exit cleanly instead. Perhaps as a configuration
option?
Technical Notes
==============
The Migrator uses generate_migrator_advisory_lock_key to build the key
for the lock. In order to be compatible across multiple adapters there
are some constraints on this key.
- Postgres limits us to 64 bit signed integers
- MySQL advisory locks are server-wide so we have to scope to the
database
- To fulfil these requirements we use a Migrator salt (a randomly
chosen signed integer with max length of 31 bits) that identifies
the Rails migration process as the owner of the lock. We multiply
this salt with a CRC32 unsigned integer hash of the database name to
get a signed 64 bit integer that can also be converted to a string
to act as a lock key in MySQL databases.
- It is important for subsequent versions of the Migrator to use the
same salt, otherwise different versions of the Migrator will not see
each other's locks.
Diffstat (limited to 'activerecord/lib')
4 files changed, 123 insertions, 23 deletions
diff --git a/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb b/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb index 402159ac13..f5b2e9fa9d 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb @@ -214,6 +214,11 @@ module ActiveRecord false end + # Does this adapter support application-enforced advisory locking? + def supports_advisory_locks? + false + end + # Should primary key values be selected from their corresponding # sequence before the insert statement? If true, next_sequence_value # is called before each insert to set the record's primary key. @@ -280,6 +285,20 @@ module ActiveRecord def enable_extension(name) end + # This is meant to be implemented by the adapters that support advisory + # locks + # + # Return true if we got the lock, otherwise false + def get_advisory_lock(key) # :nodoc: + end + + # This is meant to be implemented by the adapters that support advisory + # locks. + # + # Return true if we released the lock, otherwise false + def release_advisory_lock(key) # :nodoc: + end + # A list of extensions, to be filled in by adapters that support them. def extensions [] diff --git a/activerecord/lib/active_record/connection_adapters/abstract_mysql_adapter.rb b/activerecord/lib/active_record/connection_adapters/abstract_mysql_adapter.rb index 251acf1c83..b775c18c1b 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract_mysql_adapter.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract_mysql_adapter.rb @@ -220,6 +220,20 @@ module ActiveRecord version >= '5.6.4' end + # 5.0.0 definitely supports it, possibly supported by earlier versions but + # not sure + def supports_advisory_locks? + version >= '5.0.0' + end + + def get_advisory_lock(key, timeout = 0) # :nodoc: + select_value("SELECT GET_LOCK('#{key}', #{timeout});").to_s == '1' + end + + def release_advisory_lock(key) # :nodoc: + select_value("SELECT RELEASE_LOCK('#{key}')").to_s == '1' + end + def native_database_types NATIVE_DATABASE_TYPES end diff --git a/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb b/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb index 236c067fd5..0af7d99a4b 100644 --- a/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb +++ b/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb @@ -293,6 +293,10 @@ module ActiveRecord true end + def supports_advisory_locks? + true + end + def supports_explain? true end @@ -311,6 +315,20 @@ module ActiveRecord postgresql_version >= 90300 end + def get_advisory_lock(key) # :nodoc: + unless key.is_a?(Integer) && key.bit_length <= 63 + raise(ArgumentError, "Postgres requires advisory lock keys to be a signed 64 bit integer") + end + select_value("SELECT pg_try_advisory_lock(#{key});") + end + + def release_advisory_lock(key) # :nodoc: + unless key.is_a?(Integer) && key.bit_length <= 63 + raise(ArgumentError, "Postgres requires advisory lock keys to be a signed 64 bit integer") + end + select_value("SELECT pg_advisory_unlock(#{key})") + end + def enable_extension(name) exec_query("CREATE EXTENSION IF NOT EXISTS \"#{name}\"").tap { reload_type_map diff --git a/activerecord/lib/active_record/migration.rb b/activerecord/lib/active_record/migration.rb index c8b96b8de0..63ec8f6745 100644 --- a/activerecord/lib/active_record/migration.rb +++ b/activerecord/lib/active_record/migration.rb @@ -135,6 +135,14 @@ module ActiveRecord end end + class ConcurrentMigrationError < MigrationError #:nodoc: + DEFAULT_MESSAGE = "Cannot run migrations because another migration process is currently running.".freeze + + def initialize(message = DEFAULT_MESSAGE) + super + end + end + # = Active Record Migrations # # Migrations can manage the evolution of a schema used by several physical @@ -1042,32 +1050,18 @@ module ActiveRecord alias :current :current_migration def run - migration = migrations.detect { |m| m.version == @target_version } - raise UnknownMigrationVersionError.new(@target_version) if migration.nil? - unless (up? && migrated.include?(migration.version.to_i)) || (down? && !migrated.include?(migration.version.to_i)) - begin - execute_migration_in_transaction(migration, @direction) - rescue => e - canceled_msg = use_transaction?(migration) ? ", this migration was canceled" : "" - raise StandardError, "An error has occurred#{canceled_msg}:\n\n#{e}", e.backtrace - end + if use_advisory_lock? + with_advisory_lock { run_without_lock } + else + run_without_lock end end def migrate - if !target && @target_version && @target_version > 0 - raise UnknownMigrationVersionError.new(@target_version) - end - - runnable.each do |migration| - Base.logger.info "Migrating to #{migration.name} (#{migration.version})" if Base.logger - - begin - execute_migration_in_transaction(migration, @direction) - rescue => e - canceled_msg = use_transaction?(migration) ? "this and " : "" - raise StandardError, "An error has occurred, #{canceled_msg}all later migrations canceled:\n\n#{e}", e.backtrace - end + if use_advisory_lock? + with_advisory_lock { migrate_without_lock } + else + migrate_without_lock end end @@ -1092,10 +1086,45 @@ module ActiveRecord end def migrated - @migrated_versions ||= Set.new(self.class.get_all_versions) + @migrated_versions || load_migrated + end + + def load_migrated + @migrated_versions = Set.new(self.class.get_all_versions) end private + + def run_without_lock + migration = migrations.detect { |m| m.version == @target_version } + raise UnknownMigrationVersionError.new(@target_version) if migration.nil? + unless (up? && migrated.include?(migration.version.to_i)) || (down? && !migrated.include?(migration.version.to_i)) + begin + execute_migration_in_transaction(migration, @direction) + rescue => e + canceled_msg = use_transaction?(migration) ? ", this migration was canceled" : "" + raise StandardError, "An error has occurred#{canceled_msg}:\n\n#{e}", e.backtrace + end + end + end + + def migrate_without_lock + if !target && @target_version && @target_version > 0 + raise UnknownMigrationVersionError.new(@target_version) + end + + runnable.each do |migration| + Base.logger.info "Migrating to #{migration.name} (#{migration.version})" if Base.logger + + begin + execute_migration_in_transaction(migration, @direction) + rescue => e + canceled_msg = use_transaction?(migration) ? "this and " : "" + raise StandardError, "An error has occurred, #{canceled_msg}all later migrations canceled:\n\n#{e}", e.backtrace + end + end + end + def ran?(migration) migrated.include?(migration.version.to_i) end @@ -1157,5 +1186,25 @@ module ActiveRecord def use_transaction?(migration) !migration.disable_ddl_transaction && Base.connection.supports_ddl_transactions? end + + def use_advisory_lock? + Base.connection.supports_advisory_locks? + end + + def with_advisory_lock + key = generate_migrator_advisory_lock_key + got_lock = Base.connection.get_advisory_lock(key) + raise ConcurrentMigrationError unless got_lock + load_migrated # reload schema_migrations to be sure it wasn't changed by another process before we got the lock + yield + ensure + Base.connection.release_advisory_lock(key) if got_lock + end + + MIGRATOR_SALT = 2053462845 + def generate_migrator_advisory_lock_key + db_name_hash = Zlib.crc32(Base.connection.current_database) + MIGRATOR_SALT * db_name_hash + end end end |