Introduce synchronization around connection pool access
- use new active support Module#synchronize - allow_concurrency now switches between a null monitor and a regular monitor (defaulting to null monitor to avoid overhead)
This commit is contained in:
parent
b185d157fe
commit
50cd4bdc99
@ -1,14 +1,12 @@
|
||||
require 'set'
|
||||
|
||||
module ActiveRecord
|
||||
module ConnectionAdapters
|
||||
class ConnectionPool
|
||||
# Check for activity after at least +verification_timeout+ seconds.
|
||||
# Defaults to 0 (always check.)
|
||||
attr_accessor :verification_timeout
|
||||
delegate :verification_timeout, :to => "::ActiveRecord::Base"
|
||||
attr_reader :active_connections, :spec
|
||||
|
||||
def initialize(spec)
|
||||
@verification_timeout = 0
|
||||
|
||||
# The thread id -> adapter cache.
|
||||
@active_connections = {}
|
||||
|
||||
@ -44,7 +42,7 @@ def clear_active_connections!
|
||||
end
|
||||
end
|
||||
|
||||
# Clears the cache which maps classes
|
||||
# Clears the cache which maps classes
|
||||
def clear_reloadable_connections!
|
||||
@active_connections.each do |name, conn|
|
||||
if conn.requires_reloading?
|
||||
@ -60,7 +58,7 @@ def verify_active_connections! #:nodoc:
|
||||
conn.disconnect!
|
||||
end
|
||||
active_connections.each_value do |connection|
|
||||
connection.verify!(@verification_timeout)
|
||||
connection.verify!(verification_timeout)
|
||||
end
|
||||
end
|
||||
|
||||
@ -70,7 +68,7 @@ def retrieve_connection #:nodoc:
|
||||
name = active_connection_name
|
||||
if conn = active_connections[name]
|
||||
# Verify the connection.
|
||||
conn.verify!(@verification_timeout)
|
||||
conn.verify!(verification_timeout)
|
||||
else
|
||||
self.connection = spec
|
||||
conn = active_connections[name]
|
||||
@ -119,6 +117,7 @@ def remove_stale_cached_threads!(cache, &block)
|
||||
|
||||
def clear_entries!(cache, keys, &block)
|
||||
keys.each do |key|
|
||||
next unless cache.has_key?(key)
|
||||
block.call(key, cache[key])
|
||||
cache.delete(key)
|
||||
end
|
||||
|
@ -1,4 +1,4 @@
|
||||
require 'set'
|
||||
require 'monitor'
|
||||
|
||||
module ActiveRecord
|
||||
class Base
|
||||
@ -9,6 +9,15 @@ def initialize (config, adapter_method)
|
||||
end
|
||||
end
|
||||
|
||||
class NullMonitor
|
||||
def synchronize
|
||||
yield
|
||||
end
|
||||
end
|
||||
|
||||
cattr_accessor :connection_pools_lock, :instance_writer => false
|
||||
@@connection_pools_lock = NullMonitor.new
|
||||
|
||||
# Check for activity after at least +verification_timeout+ seconds.
|
||||
# Defaults to 0 (always check.)
|
||||
cattr_accessor :verification_timeout, :instance_writer => false
|
||||
@ -18,8 +27,18 @@ def initialize (config, adapter_method)
|
||||
@@connection_pools = {}
|
||||
|
||||
class << self
|
||||
def allow_concurrency=(flag)
|
||||
if @@allow_concurrency != flag
|
||||
if flag
|
||||
self.connection_pools_lock = Monitor.new
|
||||
else
|
||||
self.connection_pools_lock = NullMonitor.new
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# for internal use only
|
||||
def active_connections
|
||||
def active_connections #:nodoc:
|
||||
@@connection_pools.inject({}) do |hash,kv|
|
||||
hash[kv.first] = kv.last.active_connection
|
||||
hash.delete(kv.first) unless hash[kv.first]
|
||||
@ -36,22 +55,16 @@ def connection
|
||||
|
||||
# Clears the cache which maps classes to connections.
|
||||
def clear_active_connections!
|
||||
clear_cache!(@@connection_pools) do |name, pool|
|
||||
pool.clear_active_connections!
|
||||
end
|
||||
@@connection_pools.each_value {|pool| pool.clear_active_connections! }
|
||||
end
|
||||
|
||||
# Clears the cache which maps classes
|
||||
# Clears the cache which maps classes
|
||||
def clear_reloadable_connections!
|
||||
clear_cache!(@@connection_pools) do |name, pool|
|
||||
pool.clear_reloadable_connections!
|
||||
end
|
||||
@@connection_pools.each_value {|pool| pool.clear_reloadable_connections! }
|
||||
end
|
||||
|
||||
def clear_all_connections!
|
||||
clear_cache!(@@connection_pools) do |name, pool|
|
||||
pool.disconnect!
|
||||
end
|
||||
clear_cache!(@@connection_pools) {|name, pool| pool.disconnect! }
|
||||
end
|
||||
|
||||
# Verify active connections.
|
||||
@ -59,6 +72,9 @@ def verify_active_connections! #:nodoc:
|
||||
@@connection_pools.each_value {|pool| pool.verify_active_connections!}
|
||||
end
|
||||
|
||||
synchronize :active_connections, :clear_active_connections!, :clear_reloadable_connections!,
|
||||
:clear_all_connections!, :verify_active_connections!, :with => :connection_pools_lock
|
||||
|
||||
private
|
||||
def clear_cache!(cache, &block)
|
||||
cache.each(&block) if block_given?
|
||||
@ -107,7 +123,9 @@ def self.establish_connection(spec = nil)
|
||||
raise AdapterNotSpecified unless defined? RAILS_ENV
|
||||
establish_connection(RAILS_ENV)
|
||||
when ConnectionSpecification
|
||||
@@connection_pools[name] = ConnectionAdapters::ConnectionPool.new(spec)
|
||||
connection_pools_lock.synchronize do
|
||||
@@connection_pools[name] = ConnectionAdapters::ConnectionPool.new(spec)
|
||||
end
|
||||
when Symbol, String
|
||||
if configuration = configurations[spec.to_s]
|
||||
establish_connection(configuration)
|
||||
@ -171,5 +189,10 @@ def self.remove_connection(klass=self)
|
||||
pool.disconnect! if pool
|
||||
pool.spec.config if pool
|
||||
end
|
||||
|
||||
class << self
|
||||
synchronize :retrieve_connection, :retrieve_connection_pool, :connected?,
|
||||
:remove_connection, :with => :connection_pools_lock
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -11,11 +11,15 @@ class ThreadedConnectionsTest < ActiveRecord::TestCase
|
||||
def setup
|
||||
@connection = ActiveRecord::Base.remove_connection
|
||||
@connections = []
|
||||
@allow_concurrency = ActiveRecord::Base.allow_concurrency
|
||||
ActiveRecord::Base.allow_concurrency = true
|
||||
end
|
||||
|
||||
def teardown
|
||||
# clear the connection cache
|
||||
ActiveRecord::Base.clear_active_connections!
|
||||
# set allow_concurrency to saved value
|
||||
ActiveRecord::Base.allow_concurrency = @allow_concurrency
|
||||
# reestablish old connection
|
||||
ActiveRecord::Base.establish_connection(@connection)
|
||||
end
|
||||
|
Loading…
Reference in New Issue
Block a user