From 2c2a8755460ec3d32ece91c9766dbd0304ece028 Mon Sep 17 00:00:00 2001 From: Sam Davies Date: Thu, 29 Oct 2015 18:26:54 -0300 Subject: [PATCH] Use advisory locks to prevent concurrent migrations - Addresses issue #22092 - Works on Postgres and MySQL - Uses advisory locks because of two important properties: 1. The can be obtained outside of the context of a transaction 2. They are automatically released when the session ends, so if a migration process crashed for whatever reason the lock is not left open perpetually - Adds get_advisory_lock and release_advisory_lock methods to database adapters - Attempting to run a migration while another one is in process will raise a ConcurrentMigrationError instead of attempting to run in parallel with undefined behavior. This could be rescued and the migration could exit cleanly instead. Perhaps as a configuration option? Technical Notes ============== The Migrator uses generate_migrator_advisory_lock_key to build the key for the lock. In order to be compatible across multiple adapters there are some constraints on this key. - Postgres limits us to 64 bit signed integers - MySQL advisory locks are server-wide so we have to scope to the database - To fulfil these requirements we use a Migrator salt (a randomly chosen signed integer with max length of 31 bits) that identifies the Rails migration process as the owner of the lock. We multiply this salt with a CRC32 unsigned integer hash of the database name to get a signed 64 bit integer that can also be converted to a string to act as a lock key in MySQL databases. - It is important for subsequent versions of the Migrator to use the same salt, otherwise different versions of the Migrator will not see each other's locks. --- .../connection_adapters/abstract_adapter.rb | 19 ++++ .../abstract_mysql_adapter.rb | 14 +++ .../connection_adapters/postgresql_adapter.rb | 18 ++++ activerecord/lib/active_record/migration.rb | 95 ++++++++++++---- .../cases/adapters/mysql/connection_test.rb | 28 +++++ .../cases/adapters/mysql2/connection_test.rb | 28 +++++ .../adapters/postgresql/connection_test.rb | 42 ++++++++ activerecord/test/cases/migration_test.rb | 101 ++++++++++++++++++ 8 files changed, 322 insertions(+), 23 deletions(-) diff --git a/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb b/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb index 402159ac13..f5b2e9fa9d 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb @@ -214,6 +214,11 @@ def supports_savepoints? false end + # Does this adapter support application-enforced advisory locking? + def supports_advisory_locks? + false + end + # Should primary key values be selected from their corresponding # sequence before the insert statement? If true, next_sequence_value # is called before each insert to set the record's primary key. @@ -280,6 +285,20 @@ def disable_extension(name) def enable_extension(name) end + # This is meant to be implemented by the adapters that support advisory + # locks + # + # Return true if we got the lock, otherwise false + def get_advisory_lock(key) # :nodoc: + end + + # This is meant to be implemented by the adapters that support advisory + # locks. + # + # Return true if we released the lock, otherwise false + def release_advisory_lock(key) # :nodoc: + end + # A list of extensions, to be filled in by adapters that support them. def extensions [] diff --git a/activerecord/lib/active_record/connection_adapters/abstract_mysql_adapter.rb b/activerecord/lib/active_record/connection_adapters/abstract_mysql_adapter.rb index 251acf1c83..b775c18c1b 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract_mysql_adapter.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract_mysql_adapter.rb @@ -220,6 +220,20 @@ def supports_datetime_with_precision? version >= '5.6.4' end + # 5.0.0 definitely supports it, possibly supported by earlier versions but + # not sure + def supports_advisory_locks? + version >= '5.0.0' + end + + def get_advisory_lock(key, timeout = 0) # :nodoc: + select_value("SELECT GET_LOCK('#{key}', #{timeout});").to_s == '1' + end + + def release_advisory_lock(key) # :nodoc: + select_value("SELECT RELEASE_LOCK('#{key}')").to_s == '1' + end + def native_database_types NATIVE_DATABASE_TYPES end diff --git a/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb b/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb index 236c067fd5..0af7d99a4b 100644 --- a/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb +++ b/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb @@ -293,6 +293,10 @@ def supports_ddl_transactions? true end + def supports_advisory_locks? + true + end + def supports_explain? true end @@ -311,6 +315,20 @@ def supports_materialized_views? postgresql_version >= 90300 end + def get_advisory_lock(key) # :nodoc: + unless key.is_a?(Integer) && key.bit_length <= 63 + raise(ArgumentError, "Postgres requires advisory lock keys to be a signed 64 bit integer") + end + select_value("SELECT pg_try_advisory_lock(#{key});") + end + + def release_advisory_lock(key) # :nodoc: + unless key.is_a?(Integer) && key.bit_length <= 63 + raise(ArgumentError, "Postgres requires advisory lock keys to be a signed 64 bit integer") + end + select_value("SELECT pg_advisory_unlock(#{key})") + end + def enable_extension(name) exec_query("CREATE EXTENSION IF NOT EXISTS \"#{name}\"").tap { reload_type_map diff --git a/activerecord/lib/active_record/migration.rb b/activerecord/lib/active_record/migration.rb index c8b96b8de0..63ec8f6745 100644 --- a/activerecord/lib/active_record/migration.rb +++ b/activerecord/lib/active_record/migration.rb @@ -135,6 +135,14 @@ def initialize(message = nil) end end + class ConcurrentMigrationError < MigrationError #:nodoc: + DEFAULT_MESSAGE = "Cannot run migrations because another migration process is currently running.".freeze + + def initialize(message = DEFAULT_MESSAGE) + super + end + end + # = Active Record Migrations # # Migrations can manage the evolution of a schema used by several physical @@ -1042,32 +1050,18 @@ def current_migration alias :current :current_migration def run - migration = migrations.detect { |m| m.version == @target_version } - raise UnknownMigrationVersionError.new(@target_version) if migration.nil? - unless (up? && migrated.include?(migration.version.to_i)) || (down? && !migrated.include?(migration.version.to_i)) - begin - execute_migration_in_transaction(migration, @direction) - rescue => e - canceled_msg = use_transaction?(migration) ? ", this migration was canceled" : "" - raise StandardError, "An error has occurred#{canceled_msg}:\n\n#{e}", e.backtrace - end + if use_advisory_lock? + with_advisory_lock { run_without_lock } + else + run_without_lock end end def migrate - if !target && @target_version && @target_version > 0 - raise UnknownMigrationVersionError.new(@target_version) - end - - runnable.each do |migration| - Base.logger.info "Migrating to #{migration.name} (#{migration.version})" if Base.logger - - begin - execute_migration_in_transaction(migration, @direction) - rescue => e - canceled_msg = use_transaction?(migration) ? "this and " : "" - raise StandardError, "An error has occurred, #{canceled_msg}all later migrations canceled:\n\n#{e}", e.backtrace - end + if use_advisory_lock? + with_advisory_lock { migrate_without_lock } + else + migrate_without_lock end end @@ -1092,10 +1086,45 @@ def pending_migrations end def migrated - @migrated_versions ||= Set.new(self.class.get_all_versions) + @migrated_versions || load_migrated + end + + def load_migrated + @migrated_versions = Set.new(self.class.get_all_versions) end private + + def run_without_lock + migration = migrations.detect { |m| m.version == @target_version } + raise UnknownMigrationVersionError.new(@target_version) if migration.nil? + unless (up? && migrated.include?(migration.version.to_i)) || (down? && !migrated.include?(migration.version.to_i)) + begin + execute_migration_in_transaction(migration, @direction) + rescue => e + canceled_msg = use_transaction?(migration) ? ", this migration was canceled" : "" + raise StandardError, "An error has occurred#{canceled_msg}:\n\n#{e}", e.backtrace + end + end + end + + def migrate_without_lock + if !target && @target_version && @target_version > 0 + raise UnknownMigrationVersionError.new(@target_version) + end + + runnable.each do |migration| + Base.logger.info "Migrating to #{migration.name} (#{migration.version})" if Base.logger + + begin + execute_migration_in_transaction(migration, @direction) + rescue => e + canceled_msg = use_transaction?(migration) ? "this and " : "" + raise StandardError, "An error has occurred, #{canceled_msg}all later migrations canceled:\n\n#{e}", e.backtrace + end + end + end + def ran?(migration) migrated.include?(migration.version.to_i) end @@ -1157,5 +1186,25 @@ def ddl_transaction(migration) def use_transaction?(migration) !migration.disable_ddl_transaction && Base.connection.supports_ddl_transactions? end + + def use_advisory_lock? + Base.connection.supports_advisory_locks? + end + + def with_advisory_lock + key = generate_migrator_advisory_lock_key + got_lock = Base.connection.get_advisory_lock(key) + raise ConcurrentMigrationError unless got_lock + load_migrated # reload schema_migrations to be sure it wasn't changed by another process before we got the lock + yield + ensure + Base.connection.release_advisory_lock(key) if got_lock + end + + MIGRATOR_SALT = 2053462845 + def generate_migrator_advisory_lock_key + db_name_hash = Zlib.crc32(Base.connection.current_database) + MIGRATOR_SALT * db_name_hash + end end end diff --git a/activerecord/test/cases/adapters/mysql/connection_test.rb b/activerecord/test/cases/adapters/mysql/connection_test.rb index decac9e83b..75653ee9af 100644 --- a/activerecord/test/cases/adapters/mysql/connection_test.rb +++ b/activerecord/test/cases/adapters/mysql/connection_test.rb @@ -170,6 +170,34 @@ def test_mysql_set_session_variable_to_default end end + def test_get_and_release_advisory_lock + key = "test_key" + + got_lock = @connection.get_advisory_lock(key) + assert got_lock, "get_advisory_lock should have returned true but it didn't" + + assert_equal test_lock_free(key), false, + "expected the test advisory lock to be held but it wasn't" + + released_lock = @connection.release_advisory_lock(key) + assert released_lock, "expected release_advisory_lock to return true but it didn't" + + assert test_lock_free(key), 'expected the test key to be available after releasing' + end + + def test_release_non_existent_advisory_lock + fake_key = "fake_key" + released_non_existent_lock = @connection.release_advisory_lock(fake_key) + assert_equal released_non_existent_lock, false, + 'expected release_advisory_lock to return false when there was no lock to release' + end + + protected + + def test_lock_free(key) + @connection.select_value("SELECT IS_FREE_LOCK('#{key}');") == '1' + end + private def with_example_table(&block) diff --git a/activerecord/test/cases/adapters/mysql2/connection_test.rb b/activerecord/test/cases/adapters/mysql2/connection_test.rb index 000bcadebe..71c4028675 100644 --- a/activerecord/test/cases/adapters/mysql2/connection_test.rb +++ b/activerecord/test/cases/adapters/mysql2/connection_test.rb @@ -131,4 +131,32 @@ def test_logs_name_rename_column_sql ensure @connection.execute "DROP TABLE `bar_baz`" end + + def test_get_and_release_advisory_lock + key = "test_key" + + got_lock = @connection.get_advisory_lock(key) + assert got_lock, "get_advisory_lock should have returned true but it didn't" + + assert_equal test_lock_free(key), false, + "expected the test advisory lock to be held but it wasn't" + + released_lock = @connection.release_advisory_lock(key) + assert released_lock, "expected release_advisory_lock to return true but it didn't" + + assert test_lock_free(key), 'expected the test key to be available after releasing' + end + + def test_release_non_existent_advisory_lock + fake_key = "fake_key" + released_non_existent_lock = @connection.release_advisory_lock(fake_key) + assert_equal released_non_existent_lock, false, + 'expected release_advisory_lock to return false when there was no lock to release' + end + + protected + + def test_lock_free(key) + @connection.select_value("SELECT IS_FREE_LOCK('#{key}');") == 1 + end end diff --git a/activerecord/test/cases/adapters/postgresql/connection_test.rb b/activerecord/test/cases/adapters/postgresql/connection_test.rb index 722e2377c1..b12beb91de 100644 --- a/activerecord/test/cases/adapters/postgresql/connection_test.rb +++ b/activerecord/test/cases/adapters/postgresql/connection_test.rb @@ -209,5 +209,47 @@ def test_set_session_variable_default ActiveRecord::Base.establish_connection(orig_connection.deep_merge({:variables => {:debug_print_plan => :default}})) end end + + def test_get_and_release_advisory_lock + key = 5295901941911233559 + list_advisory_locks = <<-SQL + SELECT locktype, + (classid::bigint << 32) | objid::bigint AS lock_key + FROM pg_locks + WHERE locktype = 'advisory' + SQL + + got_lock = @connection.get_advisory_lock(key) + assert got_lock, "get_advisory_lock should have returned true but it didn't" + + advisory_lock = @connection.query(list_advisory_locks).find {|l| l[1] == key} + assert advisory_lock, + "expected to find an advisory lock with key #{key} but there wasn't one" + + released_lock = @connection.release_advisory_lock(key) + assert released_lock, "expected release_advisory_lock to return true but it didn't" + + advisory_locks = @connection.query(list_advisory_locks).select {|l| l[1] == key} + assert_empty advisory_locks, + "expected to have released advisory lock with key #{key} but it was still held" + end + + def test_release_non_existent_advisory_lock + fake_key = 2940075057017742022 + with_warning_suppression do + released_non_existent_lock = @connection.release_advisory_lock(fake_key) + assert_equal released_non_existent_lock, false, + 'expected release_advisory_lock to return false when there was no lock to release' + end + end + + protected + + def with_warning_suppression + log_level = @connection.client_min_messages + @connection.client_min_messages = 'error' + yield + @connection.client_min_messages = log_level + end end end diff --git a/activerecord/test/cases/migration_test.rb b/activerecord/test/cases/migration_test.rb index 10f1c7216f..9c4fa0df4f 100644 --- a/activerecord/test/cases/migration_test.rb +++ b/activerecord/test/cases/migration_test.rb @@ -522,6 +522,79 @@ def test_out_of_range_limit_should_raise end end + if ActiveRecord::Base.connection.supports_advisory_locks? + def test_migrator_generates_valid_lock_key + migration = Class.new(ActiveRecord::Migration).new + migrator = ActiveRecord::Migrator.new(:up, [migration], 100) + + lock_key = migrator.send(:generate_migrator_advisory_lock_key) + + assert ActiveRecord::Base.connection.get_advisory_lock(lock_key), + "the Migrator should have generated a valid lock key, but it didn't" + assert ActiveRecord::Base.connection.release_advisory_lock(lock_key), + "the Migrator should have generated a valid lock key, but it didn't" + end + + def test_generate_migrator_advisory_lock_key + # It is important we are consistent with how we generate this so that + # exclusive locking works across migrator versions + migration = Class.new(ActiveRecord::Migration).new + migrator = ActiveRecord::Migrator.new(:up, [migration], 100) + + lock_key = migrator.send(:generate_migrator_advisory_lock_key) + + current_database = ActiveRecord::Base.connection.current_database + salt = ActiveRecord::Migrator::MIGRATOR_SALT + expected_key = Zlib.crc32(current_database) * salt + + assert lock_key == expected_key, "expected lock key generated by the migrator to be #{expected_key}, but it was #{lock_key} instead" + assert lock_key.is_a?(Fixnum), "expected lock key to be a Fixnum, but it wasn't" + assert lock_key.bit_length <= 63, "lock key must be a signed integer of max 63 bits magnitude" + end + + def test_migrator_one_up_with_unavailable_lock + assert_no_column Person, :last_name + + migration = Class.new(ActiveRecord::Migration) { + def version; 100 end + def migrate(x) + add_column "people", "last_name", :string + end + }.new + + migrator = ActiveRecord::Migrator.new(:up, [migration], 100) + lock_key = migrator.send(:generate_migrator_advisory_lock_key) + + with_another_process_holding_lock(lock_key) do + assert_raise(ActiveRecord::ConcurrentMigrationError) { migrator.migrate } + end + + assert_no_column Person, :last_name, + "without an advisory lock, the Migrator should not make any changes, but it did." + end + + def test_migrator_one_up_with_unavailable_lock_using_run + assert_no_column Person, :last_name + + migration = Class.new(ActiveRecord::Migration) { + def version; 100 end + def migrate(x) + add_column "people", "last_name", :string + end + }.new + + migrator = ActiveRecord::Migrator.new(:up, [migration], 100) + lock_key = migrator.send(:generate_migrator_advisory_lock_key) + + with_another_process_holding_lock(lock_key) do + assert_raise(ActiveRecord::ConcurrentMigrationError) { migrator.run } + end + + assert_no_column Person, :last_name, + "without an advisory lock, the Migrator should not make any changes, but it did." + end + end + protected # This is needed to isolate class_attribute assignments like `table_name_prefix` # for each test case. @@ -531,6 +604,34 @@ def self.name; "Reminder"; end def self.base_class; self; end } end + + def with_another_process_holding_lock(lock_key) + other_process_has_lock = false + test_terminated = false + + other_process = Thread.new do + begin + conn = ActiveRecord::Base.connection_pool.checkout + conn.get_advisory_lock(lock_key) + other_process_has_lock = true + while !test_terminated do # hold the lock open until we tested everything + sleep(0.01) + end + ensure + conn.release_advisory_lock(lock_key) + ActiveRecord::Base.connection_pool.checkin(conn) + end + end + + while !other_process_has_lock # wait until the 'other process' has the lock + sleep(0.01) + end + + yield + + test_terminated = true + other_process.join + end end class ReservedWordsMigrationTest < ActiveRecord::TestCase