Nearing the finish line. Initial fixed-size connection pool implemented, more docs
This commit is contained in:
parent
82fcd9d85f
commit
fe575dd4a9
12
README.rdoc
12
README.rdoc
@ -20,15 +20,5 @@ be more thread-safe, and to add connection pooling features.
|
||||
|
||||
Remaining tasks:
|
||||
|
||||
- Fixed-size connection pool
|
||||
- Add checkin/checkout API
|
||||
- Add a #with_connection API that allows checkin/checkout of a connection outside of a provided block.
|
||||
|
||||
Model.with_connection do |conn|
|
||||
Thread.new {
|
||||
Model.connection = conn
|
||||
# do something with conn
|
||||
}
|
||||
Model.connection.select ....
|
||||
end
|
||||
- Review and put the thing to real work.
|
||||
- Look at whether existing clear_* or verify_* methods can be deprecated or removed.
|
||||
|
@ -2,14 +2,43 @@
|
||||
require 'set'
|
||||
|
||||
module ActiveRecord
|
||||
# Raised when a connection could not be obtained within the connection
|
||||
# acquisition timeout period.
|
||||
class ConnectionTimeoutError < ConnectionNotEstablished
|
||||
end
|
||||
|
||||
module ConnectionAdapters
|
||||
# Connection pool base class and ActiveRecord database connections.
|
||||
class ConnectionPool
|
||||
# Connection pool base class for managing ActiveRecord database
|
||||
# connections.
|
||||
#
|
||||
# Connections can be obtained and used from a connection pool in several
|
||||
# ways:
|
||||
#
|
||||
# 1. Simply use ActiveRecord::Base.connection as in pre-connection-pooled
|
||||
# ActiveRecord. Eventually, when you're done with the connection and
|
||||
# wish it to be returned to the pool, you call
|
||||
# ActiveRecord::Base.connection_pool.release_thread_connection.
|
||||
# 2. Manually check out a connection from the pool with
|
||||
# ActiveRecord::Base.connection_pool.checkout. You are responsible for
|
||||
# returning this connection to the pool when finished by calling
|
||||
# ActiveRecord::Base.connection_pool.checkin(connection).
|
||||
# 3. Use ActiveRecord::Base.connection_pool.with_connection(&block), which
|
||||
# obtains a connection, yields it as the sole argument to the block,
|
||||
# and returns it to the pool after the block completes.
|
||||
class AbstractConnectionPool
|
||||
# Factory method for connection pools.
|
||||
# Determines pool type to use based on contents of connection specification.
|
||||
# FIXME: specification configuration TBD.
|
||||
# Determines pool type to use based on contents of connection
|
||||
# specification. Additional options for connection specification:
|
||||
#
|
||||
# * +pool+: number indicating size of fixed connection pool to use
|
||||
# * +wait_timeout+ (optional): number of seconds to block and wait
|
||||
# for a connection before giving up and raising a timeout error.
|
||||
def self.create(spec)
|
||||
ConnectionPerThread.new(spec)
|
||||
if spec.config[:pool] && spec.config[:pool].to_i > 0
|
||||
ConnectionPool.new(spec)
|
||||
else
|
||||
ConnectionPerThread.new(spec)
|
||||
end
|
||||
end
|
||||
|
||||
delegate :verification_timeout, :to => "::ActiveRecord::Base"
|
||||
@ -115,11 +144,16 @@ def connections
|
||||
end
|
||||
private :connections
|
||||
|
||||
synchronize :connection, :release_thread_connection, :checkout, :checkin,
|
||||
synchronize :connection, :release_thread_connection,
|
||||
:clear_reloadable_connections!, :verify_active_connections!,
|
||||
:connected?, :disconnect!, :with => :@connection_mutex
|
||||
|
||||
private
|
||||
def new_connection
|
||||
config = spec.config.reverse_merge(:allow_concurrency => ActiveRecord::Base.allow_concurrency)
|
||||
ActiveRecord::Base.send(spec.adapter_method, config)
|
||||
end
|
||||
|
||||
def active_connection_name #:nodoc:
|
||||
Thread.current.object_id
|
||||
end
|
||||
@ -139,7 +173,10 @@ def remove_stale_cached_threads!(cache, &block)
|
||||
end
|
||||
end
|
||||
|
||||
class ConnectionPerThread < ConnectionPool
|
||||
# ConnectionPerThread is a simple implementation: always create/disconnect
|
||||
# on checkout/checkin, and use the base class reserved connections hash to
|
||||
# manage the per-thread connections.
|
||||
class ConnectionPerThread < AbstractConnectionPool
|
||||
def active_connection
|
||||
@reserved_connections[active_connection_name]
|
||||
end
|
||||
@ -155,12 +192,6 @@ def checkin(conn)
|
||||
end
|
||||
|
||||
private
|
||||
# Set the connection for the class.
|
||||
def new_connection
|
||||
config = spec.config.reverse_merge(:allow_concurrency => ActiveRecord::Base.allow_concurrency)
|
||||
ActiveRecord::Base.send(spec.adapter_method, config)
|
||||
end
|
||||
|
||||
def connections
|
||||
@reserved_connections.values
|
||||
end
|
||||
@ -170,6 +201,70 @@ def remove_connection(conn)
|
||||
end
|
||||
end
|
||||
|
||||
# ConnectionPool provides a full, fixed-size connection pool with timed
|
||||
# waits when the pool is exhausted.
|
||||
class ConnectionPool < AbstractConnectionPool
|
||||
def initialize(spec)
|
||||
super
|
||||
# default 5 second timeout
|
||||
@timeout = spec.config[:wait_timeout] || 5
|
||||
@size = spec.config[:pool].to_i
|
||||
@queue = @connection_mutex.new_cond
|
||||
@connections = []
|
||||
@checked_out = []
|
||||
end
|
||||
|
||||
def checkout
|
||||
# Checkout an available connection
|
||||
conn = @connection_mutex.synchronize do
|
||||
if @connections.length < @size
|
||||
checkout_new_connection
|
||||
elsif @checked_out.size < @connections.size
|
||||
checkout_existing_connection
|
||||
end
|
||||
end
|
||||
return conn if conn
|
||||
|
||||
# No connections available; wait for one
|
||||
@connection_mutex.synchronize do
|
||||
if @queue.wait(@timeout)
|
||||
checkout_existing_connection
|
||||
else
|
||||
raise ConnectionTimeoutError, "could not obtain a database connection in a timely fashion"
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def checkin(conn)
|
||||
@connection_mutex.synchronize do
|
||||
@checked_out -= conn
|
||||
@queue.signal
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
def checkout_new_connection
|
||||
c = new_connection
|
||||
@connections << c
|
||||
@checked_out << c
|
||||
c
|
||||
end
|
||||
|
||||
def checkout_existing_connection
|
||||
c = [@connections - @checked_out].first
|
||||
@checked_out << c
|
||||
c
|
||||
end
|
||||
|
||||
def connections
|
||||
@connections
|
||||
end
|
||||
|
||||
def remove_connection(conn)
|
||||
@connections.delete conn
|
||||
end
|
||||
end
|
||||
|
||||
module ConnectionHandlerMethods
|
||||
def initialize(pools = {})
|
||||
@connection_pools = pools
|
||||
@ -180,7 +275,7 @@ def connection_pools
|
||||
end
|
||||
|
||||
def establish_connection(name, spec)
|
||||
@connection_pools[name] = ConnectionAdapters::ConnectionPool.create(spec)
|
||||
@connection_pools[name] = ConnectionAdapters::AbstractConnectionPool.create(spec)
|
||||
end
|
||||
|
||||
# for internal use only and for testing;
|
||||
@ -238,15 +333,14 @@ def remove_connection(klass)
|
||||
pool.spec.config if pool
|
||||
end
|
||||
|
||||
private
|
||||
def retrieve_connection_pool(klass)
|
||||
loop do
|
||||
pool = @connection_pools[klass.name]
|
||||
return pool if pool
|
||||
return nil if ActiveRecord::Base == klass
|
||||
klass = klass.superclass
|
||||
end
|
||||
def retrieve_connection_pool(klass)
|
||||
loop do
|
||||
pool = @connection_pools[klass.name]
|
||||
return pool if pool
|
||||
return nil if ActiveRecord::Base == klass
|
||||
klass = klass.superclass
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# This connection handler is not thread-safe, as it does not protect access
|
||||
|
@ -116,6 +116,10 @@ def connection
|
||||
retrieve_connection
|
||||
end
|
||||
|
||||
def connection_pool
|
||||
connection_handler.retrieve_connection_pool(self)
|
||||
end
|
||||
|
||||
def retrieve_connection
|
||||
connection_handler.retrieve_connection(self)
|
||||
end
|
||||
|
@ -40,4 +40,40 @@ def test_threaded_connections
|
||||
assert_equal @connections.length, 5
|
||||
end
|
||||
end
|
||||
|
||||
class PooledConnectionsTest < ActiveRecord::TestCase
|
||||
def setup
|
||||
@connection = ActiveRecord::Base.remove_connection
|
||||
@connections = []
|
||||
@allow_concurrency = ActiveRecord::Base.allow_concurrency
|
||||
ActiveRecord::Base.allow_concurrency = true
|
||||
end
|
||||
|
||||
def teardown
|
||||
ActiveRecord::Base.clear_all_connections!
|
||||
ActiveRecord::Base.allow_concurrency = @allow_concurrency
|
||||
ActiveRecord::Base.establish_connection(@connection)
|
||||
end
|
||||
|
||||
def gather_connections
|
||||
ActiveRecord::Base.establish_connection(@connection.merge({:pool => 2, :wait_timeout => 0.3}))
|
||||
@timed_out = 0
|
||||
|
||||
4.times do
|
||||
Thread.new do
|
||||
begin
|
||||
@connections << ActiveRecord::Base.connection_pool.checkout
|
||||
rescue ActiveRecord::ConnectionTimeoutError
|
||||
@timed_out += 1
|
||||
end
|
||||
end.join
|
||||
end
|
||||
end
|
||||
|
||||
def test_threaded_connections
|
||||
gather_connections
|
||||
assert_equal @connections.length, 2
|
||||
assert_equal @timed_out, 2
|
||||
end
|
||||
end
|
||||
end
|
||||
|
Loading…
Reference in New Issue
Block a user