Add ActiveRecord::Base.connection.with_advisory_lock

This commit is contained in:
Rafael Mendonça França 2021-02-05 21:40:15 +00:00
parent c8a86d361c
commit 1ab4dbf8aa
No known key found for this signature in database
GPG Key ID: FC23B6D0F1EEE948
8 changed files with 166 additions and 26 deletions

@ -1,3 +1,13 @@
* Add `ActiveRecord::Base.connection.with_advisory_lock`.
This method allow applications to obtain an exclusive session level advisory lock,
if available, for the duration of the block.
If another session already have the lock, the method will return `false` and the block will
not be executed.
*Rafael Mendonça França*
* Removing trailing whitespace when matching columns in
`ActiveRecord::Sanitization.disallow_raw_sql!`.

@ -436,11 +436,28 @@ def advisory_locks_enabled? # :nodoc:
supports_advisory_locks? && @advisory_locks_enabled
end
# Obtains an exclusive session level advisory lock, if available, for the duration of the block.
#
# Returns +false+ without executing the block if the lock could not be obtained.
def with_advisory_lock(lock_id, timeout = 0)
lock_acquired = get_advisory_lock(lock_id, timeout)
if lock_acquired
yield
end
lock_acquired
ensure
if lock_acquired && !release_advisory_lock(lock_id)
raise ReleaseAdvisoryLockError
end
end
# This is meant to be implemented by the adapters that support advisory
# locks
#
# Return true if we got the lock, otherwise false
def get_advisory_lock(lock_id) # :nodoc:
def get_advisory_lock(lock_id, timeout = 0) # :nodoc:
end
# This is meant to be implemented by the adapters that support advisory

@ -372,7 +372,7 @@ def supports_lazy_transactions?
true
end
def get_advisory_lock(lock_id) # :nodoc:
def get_advisory_lock(lock_id, timeout = 0) # :nodoc:
unless lock_id.is_a?(Integer) && lock_id.bit_length <= 63
raise(ArgumentError, "PostgreSQL requires advisory lock ids to be a signed 64 bit integer")
end

@ -396,6 +396,13 @@ class QueryAborted < StatementInvalid
class LockWaitTimeout < StatementInvalid
end
# ReleaseAdvisoryLockError will be raised when a advisory lock fail to be released.
class ReleaseAdvisoryLockError < StatementInvalid
def initialize(message = "Failed to release advisory lock")
super
end
end
# StatementTimeout will be raised when statement timeout exceeded.
class StatementTimeout < QueryAborted
end

@ -167,7 +167,6 @@ def detailed_migration_message
class ConcurrentMigrationError < MigrationError #:nodoc:
DEFAULT_MESSAGE = "Cannot run migrations because another migration process is currently running."
RELEASE_LOCK_FAILED_MESSAGE = "Failed to release advisory lock"
def initialize(message = DEFAULT_MESSAGE)
super
@ -1397,16 +1396,18 @@ def with_advisory_lock
lock_id = generate_migrator_advisory_lock_id
with_advisory_lock_connection do |connection|
got_lock = connection.get_advisory_lock(lock_id)
raise ConcurrentMigrationError unless got_lock
load_migrated # reload schema_migrations to be sure it wasn't changed by another process before we got the lock
yield
ensure
if got_lock && !connection.release_advisory_lock(lock_id)
raise ConcurrentMigrationError.new(
ConcurrentMigrationError::RELEASE_LOCK_FAILED_MESSAGE
)
result = nil
got_lock = connection.with_advisory_lock(lock_id) do
load_migrated # reload schema_migrations to be sure it wasn't changed by another process before we got the lock
result = yield
end
raise ConcurrentMigrationError unless got_lock
result
rescue ReleaseAdvisoryLockError => e
raise ConcurrentMigrationError, e.message, e.backtrace
end
end

@ -201,8 +201,59 @@ def test_release_non_existent_advisory_lock
"expected release_advisory_lock to return false when there was no lock to release"
end
def test_with_advisory_lock
lock_name = "test lock'n'name"
got_lock = @connection.with_advisory_lock(lock_name) do
assert_equal test_lock_free(lock_name), false,
"expected the test advisory lock to be held but it wasn't"
end
assert got_lock, "get_advisory_lock should have returned true but it didn't"
assert test_lock_free(lock_name), "expected the test lock to be available after releasing"
end
def test_with_advisory_lock_with_an_already_existing_lock
lock_name = "test lock'n'name"
with_another_process_holding_lock(lock_name) do
assert_equal test_lock_free(lock_name), false, "expected the test advisory lock to be held but it wasn't"
got_lock = @connection.with_advisory_lock(lock_name) do
flunk "lock should not be acquired"
end
assert_equal test_lock_free(lock_name), false, "expected the test advisory lock to be held but it wasn't"
assert_not got_lock, "get_advisory_lock should have returned false but it didn't"
end
end
private
def test_lock_free(lock_name)
@connection.select_value("SELECT IS_FREE_LOCK(#{@connection.quote(lock_name)})") == 1
end
def with_another_process_holding_lock(lock_id)
thread_lock = Concurrent::CountDownLatch.new
test_terminated = Concurrent::CountDownLatch.new
other_process = Thread.new do
conn = ActiveRecord::Base.connection_pool.checkout
conn.get_advisory_lock(lock_id)
thread_lock.count_down
test_terminated.wait # hold the lock open until we tested everything
ensure
conn.release_advisory_lock(lock_id)
ActiveRecord::Base.connection_pool.checkin(conn)
end
thread_lock.wait # wait until the 'other process' has the lock
yield
test_terminated.count_down
other_process.join
end
end

@ -203,25 +203,19 @@ def test_set_session_timezone
def test_get_and_release_advisory_lock
lock_id = 5295901941911233559
list_advisory_locks = <<~SQL
SELECT locktype,
(classid::bigint << 32) | objid::bigint AS lock_id
FROM pg_locks
WHERE locktype = 'advisory'
SQL
got_lock = @connection.get_advisory_lock(lock_id)
assert got_lock, "get_advisory_lock should have returned true but it didn't"
advisory_lock = @connection.query(list_advisory_locks).find { |l| l[1] == lock_id }
assert advisory_lock,
advisory_lock = test_lock_free(lock_id)
assert_equal advisory_lock, false,
"expected to find an advisory lock with lock_id #{lock_id} but there wasn't one"
released_lock = @connection.release_advisory_lock(lock_id)
assert released_lock, "expected release_advisory_lock to return true but it didn't"
advisory_locks = @connection.query(list_advisory_locks).select { |l| l[1] == lock_id }
assert_empty advisory_locks,
advisory_lock = test_lock_free(lock_id)
assert advisory_lock,
"expected to have released advisory lock with lock_id #{lock_id} but it was still held"
end
@ -234,7 +228,70 @@ def test_release_non_existent_advisory_lock
end
end
def test_with_advisory_lock
lock_id = 5295901941911233559
got_lock = @connection.with_advisory_lock(lock_id) do
assert_equal test_lock_free(lock_id), false,
"expected to find an advisory lock with lock_id #{lock_id} but there wasn't one"
end
assert got_lock, "get_advisory_lock should have returned true but it didn't"
assert test_lock_free(lock_id), "expected to find an advisory lock with lock_id #{lock_id} but there wasn't one"
end
def test_with_advisory_lock_with_an_already_existing_lock
lock_id = 5295901941911233559
with_another_process_holding_lock(lock_id) do
assert_equal test_lock_free(lock_id), false,
"expected to find an advisory lock with lock_id #{lock_id} but there wasn't one"
got_lock = @connection.with_advisory_lock(lock_id) do
flunk "lock should not be acquired"
end
assert_equal test_lock_free(lock_id), false,
"expected to find an advisory lock with lock_id #{lock_id} but there wasn't one"
assert_not got_lock, "get_advisory_lock should have returned false but it didn't"
end
end
private
def test_lock_free(lock_id)
list_advisory_locks = <<~SQL
SELECT locktype,
(classid::bigint << 32) | objid::bigint AS lock_id
FROM pg_locks
WHERE locktype = 'advisory'
SQL
!@connection.query(list_advisory_locks).find { |l| l[1] == lock_id }
end
def with_another_process_holding_lock(lock_id)
thread_lock = Concurrent::CountDownLatch.new
test_terminated = Concurrent::CountDownLatch.new
other_process = Thread.new do
conn = ActiveRecord::Base.connection_pool.checkout
conn.get_advisory_lock(lock_id)
thread_lock.count_down
test_terminated.wait # hold the lock open until we tested everything
ensure
conn.release_advisory_lock(lock_id)
ActiveRecord::Base.connection_pool.checkin(conn)
end
thread_lock.wait # wait until the 'other process' has the lock
yield
test_terminated.count_down
other_process.join
end
def with_warning_suppression
log_level = @connection.client_min_messages
@connection.client_min_messages = "error"

@ -1016,10 +1016,7 @@ def test_with_advisory_lock_raises_the_right_error_when_it_fails_to_release_lock
end
end
assert_match(
/#{ActiveRecord::ConcurrentMigrationError::RELEASE_LOCK_FAILED_MESSAGE}/,
e.message
)
assert_match(/Failed to release advisory lock/, e.message)
end
end