Use advisory locks to prevent concurrent migrations

- Addresses issue #22092
- Works on Postgres and MySQL
- Uses advisory locks because of two important properties:
  1. The can be obtained outside of the context of a transaction
  2. They are automatically released when the session ends, so if a
  migration process crashed for whatever reason the lock is not left
  open perpetually
- Adds get_advisory_lock and release_advisory_lock methods to database
  adapters
- Attempting to run a migration while another one is in process will
  raise a ConcurrentMigrationError instead of attempting to run in
  parallel with undefined behavior. This could be rescued and
  the migration could exit cleanly instead. Perhaps as a configuration
  option?

Technical Notes
==============

The Migrator uses generate_migrator_advisory_lock_key to build the key
for the lock. In order to be compatible across multiple adapters there
are some constraints on this key.
- Postgres limits us to 64 bit signed integers
- MySQL advisory locks are server-wide so we have to scope to the
  database
- To fulfil these requirements we use a Migrator salt (a randomly
  chosen signed integer with max length of 31 bits) that identifies
  the Rails migration process as the owner of the lock. We multiply
  this salt with a CRC32 unsigned integer hash of the database name to
  get a signed 64 bit integer that can also be converted to a string
  to act as a lock key in MySQL databases.
- It is important for subsequent versions of the Migrator to use the
  same salt, otherwise different versions of the Migrator will not see
  each other's locks.
This commit is contained in:
Sam Davies 2015-10-29 18:26:54 -03:00
parent 0174837bfa
commit 2c2a875546
8 changed files with 322 additions and 23 deletions

@ -214,6 +214,11 @@ def supports_savepoints?
false
end
# Does this adapter support application-enforced advisory locking?
def supports_advisory_locks?
false
end
# Should primary key values be selected from their corresponding
# sequence before the insert statement? If true, next_sequence_value
# is called before each insert to set the record's primary key.
@ -280,6 +285,20 @@ def disable_extension(name)
def enable_extension(name)
end
# This is meant to be implemented by the adapters that support advisory
# locks
#
# Return true if we got the lock, otherwise false
def get_advisory_lock(key) # :nodoc:
end
# This is meant to be implemented by the adapters that support advisory
# locks.
#
# Return true if we released the lock, otherwise false
def release_advisory_lock(key) # :nodoc:
end
# A list of extensions, to be filled in by adapters that support them.
def extensions
[]

@ -220,6 +220,20 @@ def supports_datetime_with_precision?
version >= '5.6.4'
end
# 5.0.0 definitely supports it, possibly supported by earlier versions but
# not sure
def supports_advisory_locks?
version >= '5.0.0'
end
def get_advisory_lock(key, timeout = 0) # :nodoc:
select_value("SELECT GET_LOCK('#{key}', #{timeout});").to_s == '1'
end
def release_advisory_lock(key) # :nodoc:
select_value("SELECT RELEASE_LOCK('#{key}')").to_s == '1'
end
def native_database_types
NATIVE_DATABASE_TYPES
end

@ -293,6 +293,10 @@ def supports_ddl_transactions?
true
end
def supports_advisory_locks?
true
end
def supports_explain?
true
end
@ -311,6 +315,20 @@ def supports_materialized_views?
postgresql_version >= 90300
end
def get_advisory_lock(key) # :nodoc:
unless key.is_a?(Integer) && key.bit_length <= 63
raise(ArgumentError, "Postgres requires advisory lock keys to be a signed 64 bit integer")
end
select_value("SELECT pg_try_advisory_lock(#{key});")
end
def release_advisory_lock(key) # :nodoc:
unless key.is_a?(Integer) && key.bit_length <= 63
raise(ArgumentError, "Postgres requires advisory lock keys to be a signed 64 bit integer")
end
select_value("SELECT pg_advisory_unlock(#{key})")
end
def enable_extension(name)
exec_query("CREATE EXTENSION IF NOT EXISTS \"#{name}\"").tap {
reload_type_map

@ -135,6 +135,14 @@ def initialize(message = nil)
end
end
class ConcurrentMigrationError < MigrationError #:nodoc:
DEFAULT_MESSAGE = "Cannot run migrations because another migration process is currently running.".freeze
def initialize(message = DEFAULT_MESSAGE)
super
end
end
# = Active Record Migrations
#
# Migrations can manage the evolution of a schema used by several physical
@ -1042,32 +1050,18 @@ def current_migration
alias :current :current_migration
def run
migration = migrations.detect { |m| m.version == @target_version }
raise UnknownMigrationVersionError.new(@target_version) if migration.nil?
unless (up? && migrated.include?(migration.version.to_i)) || (down? && !migrated.include?(migration.version.to_i))
begin
execute_migration_in_transaction(migration, @direction)
rescue => e
canceled_msg = use_transaction?(migration) ? ", this migration was canceled" : ""
raise StandardError, "An error has occurred#{canceled_msg}:\n\n#{e}", e.backtrace
end
if use_advisory_lock?
with_advisory_lock { run_without_lock }
else
run_without_lock
end
end
def migrate
if !target && @target_version && @target_version > 0
raise UnknownMigrationVersionError.new(@target_version)
end
runnable.each do |migration|
Base.logger.info "Migrating to #{migration.name} (#{migration.version})" if Base.logger
begin
execute_migration_in_transaction(migration, @direction)
rescue => e
canceled_msg = use_transaction?(migration) ? "this and " : ""
raise StandardError, "An error has occurred, #{canceled_msg}all later migrations canceled:\n\n#{e}", e.backtrace
end
if use_advisory_lock?
with_advisory_lock { migrate_without_lock }
else
migrate_without_lock
end
end
@ -1092,10 +1086,45 @@ def pending_migrations
end
def migrated
@migrated_versions ||= Set.new(self.class.get_all_versions)
@migrated_versions || load_migrated
end
def load_migrated
@migrated_versions = Set.new(self.class.get_all_versions)
end
private
def run_without_lock
migration = migrations.detect { |m| m.version == @target_version }
raise UnknownMigrationVersionError.new(@target_version) if migration.nil?
unless (up? && migrated.include?(migration.version.to_i)) || (down? && !migrated.include?(migration.version.to_i))
begin
execute_migration_in_transaction(migration, @direction)
rescue => e
canceled_msg = use_transaction?(migration) ? ", this migration was canceled" : ""
raise StandardError, "An error has occurred#{canceled_msg}:\n\n#{e}", e.backtrace
end
end
end
def migrate_without_lock
if !target && @target_version && @target_version > 0
raise UnknownMigrationVersionError.new(@target_version)
end
runnable.each do |migration|
Base.logger.info "Migrating to #{migration.name} (#{migration.version})" if Base.logger
begin
execute_migration_in_transaction(migration, @direction)
rescue => e
canceled_msg = use_transaction?(migration) ? "this and " : ""
raise StandardError, "An error has occurred, #{canceled_msg}all later migrations canceled:\n\n#{e}", e.backtrace
end
end
end
def ran?(migration)
migrated.include?(migration.version.to_i)
end
@ -1157,5 +1186,25 @@ def ddl_transaction(migration)
def use_transaction?(migration)
!migration.disable_ddl_transaction && Base.connection.supports_ddl_transactions?
end
def use_advisory_lock?
Base.connection.supports_advisory_locks?
end
def with_advisory_lock
key = generate_migrator_advisory_lock_key
got_lock = Base.connection.get_advisory_lock(key)
raise ConcurrentMigrationError unless got_lock
load_migrated # reload schema_migrations to be sure it wasn't changed by another process before we got the lock
yield
ensure
Base.connection.release_advisory_lock(key) if got_lock
end
MIGRATOR_SALT = 2053462845
def generate_migrator_advisory_lock_key
db_name_hash = Zlib.crc32(Base.connection.current_database)
MIGRATOR_SALT * db_name_hash
end
end
end

@ -170,6 +170,34 @@ def test_mysql_set_session_variable_to_default
end
end
def test_get_and_release_advisory_lock
key = "test_key"
got_lock = @connection.get_advisory_lock(key)
assert got_lock, "get_advisory_lock should have returned true but it didn't"
assert_equal test_lock_free(key), false,
"expected the test advisory lock to be held but it wasn't"
released_lock = @connection.release_advisory_lock(key)
assert released_lock, "expected release_advisory_lock to return true but it didn't"
assert test_lock_free(key), 'expected the test key to be available after releasing'
end
def test_release_non_existent_advisory_lock
fake_key = "fake_key"
released_non_existent_lock = @connection.release_advisory_lock(fake_key)
assert_equal released_non_existent_lock, false,
'expected release_advisory_lock to return false when there was no lock to release'
end
protected
def test_lock_free(key)
@connection.select_value("SELECT IS_FREE_LOCK('#{key}');") == '1'
end
private
def with_example_table(&block)

@ -131,4 +131,32 @@ def test_logs_name_rename_column_sql
ensure
@connection.execute "DROP TABLE `bar_baz`"
end
def test_get_and_release_advisory_lock
key = "test_key"
got_lock = @connection.get_advisory_lock(key)
assert got_lock, "get_advisory_lock should have returned true but it didn't"
assert_equal test_lock_free(key), false,
"expected the test advisory lock to be held but it wasn't"
released_lock = @connection.release_advisory_lock(key)
assert released_lock, "expected release_advisory_lock to return true but it didn't"
assert test_lock_free(key), 'expected the test key to be available after releasing'
end
def test_release_non_existent_advisory_lock
fake_key = "fake_key"
released_non_existent_lock = @connection.release_advisory_lock(fake_key)
assert_equal released_non_existent_lock, false,
'expected release_advisory_lock to return false when there was no lock to release'
end
protected
def test_lock_free(key)
@connection.select_value("SELECT IS_FREE_LOCK('#{key}');") == 1
end
end

@ -209,5 +209,47 @@ def test_set_session_variable_default
ActiveRecord::Base.establish_connection(orig_connection.deep_merge({:variables => {:debug_print_plan => :default}}))
end
end
def test_get_and_release_advisory_lock
key = 5295901941911233559
list_advisory_locks = <<-SQL
SELECT locktype,
(classid::bigint << 32) | objid::bigint AS lock_key
FROM pg_locks
WHERE locktype = 'advisory'
SQL
got_lock = @connection.get_advisory_lock(key)
assert got_lock, "get_advisory_lock should have returned true but it didn't"
advisory_lock = @connection.query(list_advisory_locks).find {|l| l[1] == key}
assert advisory_lock,
"expected to find an advisory lock with key #{key} but there wasn't one"
released_lock = @connection.release_advisory_lock(key)
assert released_lock, "expected release_advisory_lock to return true but it didn't"
advisory_locks = @connection.query(list_advisory_locks).select {|l| l[1] == key}
assert_empty advisory_locks,
"expected to have released advisory lock with key #{key} but it was still held"
end
def test_release_non_existent_advisory_lock
fake_key = 2940075057017742022
with_warning_suppression do
released_non_existent_lock = @connection.release_advisory_lock(fake_key)
assert_equal released_non_existent_lock, false,
'expected release_advisory_lock to return false when there was no lock to release'
end
end
protected
def with_warning_suppression
log_level = @connection.client_min_messages
@connection.client_min_messages = 'error'
yield
@connection.client_min_messages = log_level
end
end
end

@ -522,6 +522,79 @@ def test_out_of_range_limit_should_raise
end
end
if ActiveRecord::Base.connection.supports_advisory_locks?
def test_migrator_generates_valid_lock_key
migration = Class.new(ActiveRecord::Migration).new
migrator = ActiveRecord::Migrator.new(:up, [migration], 100)
lock_key = migrator.send(:generate_migrator_advisory_lock_key)
assert ActiveRecord::Base.connection.get_advisory_lock(lock_key),
"the Migrator should have generated a valid lock key, but it didn't"
assert ActiveRecord::Base.connection.release_advisory_lock(lock_key),
"the Migrator should have generated a valid lock key, but it didn't"
end
def test_generate_migrator_advisory_lock_key
# It is important we are consistent with how we generate this so that
# exclusive locking works across migrator versions
migration = Class.new(ActiveRecord::Migration).new
migrator = ActiveRecord::Migrator.new(:up, [migration], 100)
lock_key = migrator.send(:generate_migrator_advisory_lock_key)
current_database = ActiveRecord::Base.connection.current_database
salt = ActiveRecord::Migrator::MIGRATOR_SALT
expected_key = Zlib.crc32(current_database) * salt
assert lock_key == expected_key, "expected lock key generated by the migrator to be #{expected_key}, but it was #{lock_key} instead"
assert lock_key.is_a?(Fixnum), "expected lock key to be a Fixnum, but it wasn't"
assert lock_key.bit_length <= 63, "lock key must be a signed integer of max 63 bits magnitude"
end
def test_migrator_one_up_with_unavailable_lock
assert_no_column Person, :last_name
migration = Class.new(ActiveRecord::Migration) {
def version; 100 end
def migrate(x)
add_column "people", "last_name", :string
end
}.new
migrator = ActiveRecord::Migrator.new(:up, [migration], 100)
lock_key = migrator.send(:generate_migrator_advisory_lock_key)
with_another_process_holding_lock(lock_key) do
assert_raise(ActiveRecord::ConcurrentMigrationError) { migrator.migrate }
end
assert_no_column Person, :last_name,
"without an advisory lock, the Migrator should not make any changes, but it did."
end
def test_migrator_one_up_with_unavailable_lock_using_run
assert_no_column Person, :last_name
migration = Class.new(ActiveRecord::Migration) {
def version; 100 end
def migrate(x)
add_column "people", "last_name", :string
end
}.new
migrator = ActiveRecord::Migrator.new(:up, [migration], 100)
lock_key = migrator.send(:generate_migrator_advisory_lock_key)
with_another_process_holding_lock(lock_key) do
assert_raise(ActiveRecord::ConcurrentMigrationError) { migrator.run }
end
assert_no_column Person, :last_name,
"without an advisory lock, the Migrator should not make any changes, but it did."
end
end
protected
# This is needed to isolate class_attribute assignments like `table_name_prefix`
# for each test case.
@ -531,6 +604,34 @@ def self.name; "Reminder"; end
def self.base_class; self; end
}
end
def with_another_process_holding_lock(lock_key)
other_process_has_lock = false
test_terminated = false
other_process = Thread.new do
begin
conn = ActiveRecord::Base.connection_pool.checkout
conn.get_advisory_lock(lock_key)
other_process_has_lock = true
while !test_terminated do # hold the lock open until we tested everything
sleep(0.01)
end
ensure
conn.release_advisory_lock(lock_key)
ActiveRecord::Base.connection_pool.checkin(conn)
end
end
while !other_process_has_lock # wait until the 'other process' has the lock
sleep(0.01)
end
yield
test_terminated = true
other_process.join
end
end
class ReservedWordsMigrationTest < ActiveRecord::TestCase