Split internal subscriber tracking from Postgres adapter

This commit is contained in:
Matthew Draper 2016-01-22 11:13:12 +10:30
parent 83d2c39d5e
commit dccc15d403
3 changed files with 70 additions and 24 deletions

@ -1,5 +1,8 @@
module ActionCable module ActionCable
module SubscriptionAdapter module SubscriptionAdapter
autoload :Base, 'action_cable/subscription_adapter/base' extend ActiveSupport::Autoload
autoload :Base
autoload :SubscriberMap
end end
end end

@ -12,11 +12,11 @@ def broadcast(channel, payload)
end end
def subscribe(channel, callback, success_callback = nil) def subscribe(channel, callback, success_callback = nil)
listener.subscribe_to(channel, callback, success_callback) listener.add_subscriber(channel, callback, success_callback)
end end
def unsubscribe(channel, callback) def unsubscribe(channel, callback)
listener.unsubscribe_from(channel, callback) listener.remove_subscriber(channel, callback)
end end
def with_connection(&block) # :nodoc: def with_connection(&block) # :nodoc:
@ -36,11 +36,11 @@ def listener
@listener ||= Listener.new(self) @listener ||= Listener.new(self)
end end
class Listener class Listener < SubscriberMap
def initialize(adapter) def initialize(adapter)
super()
@adapter = adapter @adapter = adapter
@subscribers = Hash.new { |h,k| h[k] = [] }
@sync = Mutex.new
@queue = Queue.new @queue = Queue.new
Thread.new do Thread.new do
@ -65,32 +65,22 @@ def listen
end end
pg_conn.wait_for_notify(1) do |chan, pid, message| pg_conn.wait_for_notify(1) do |chan, pid, message|
@subscribers[chan].each do |callback| broadcast(chan, message)
::EM.next_tick { callback.call(message) }
end
end end
end end
end end
end end
def subscribe_to(channel, callback, success_callback) def add_channel(channel, on_success)
@sync.synchronize do @queue.push([:listen, channel, on_success])
if @subscribers[channel].empty?
@queue.push([:listen, channel, success_callback])
end end
@subscribers[channel] << callback def remove_channel(channel)
end
end
def unsubscribe_from(channel, callback)
@sync.synchronize do
@subscribers[channel].delete(callback)
if @subscribers[channel].empty?
@queue.push([:unlisten, channel]) @queue.push([:unlisten, channel])
end end
end
def invoke_callback(*)
::EM.next_tick { super }
end end
end end
end end

@ -0,0 +1,53 @@
module ActionCable
module SubscriptionAdapter
class SubscriberMap
def initialize
@subscribers = Hash.new { |h,k| h[k] = [] }
@sync = Mutex.new
end
def add_subscriber(channel, subscriber, on_success)
@sync.synchronize do
new_channel = !@subscribers.key?(channel)
@subscribers[channel] << subscriber
if new_channel
add_channel channel, on_success
elsif on_success
on_success.call
end
end
end
def remove_subscriber(channel, subscriber)
@sync.synchronize do
@subscribers[channel].delete(subscriber)
if @subscribers[channel].empty?
@subscribers.delete channel
remove_channel channel
end
end
end
def broadcast(channel, message)
list = @sync.synchronize { @subscribers[channel].dup }
list.each do |subscriber|
invoke_callback(subscriber, message)
end
end
def add_channel(channel, on_success)
on_success.call if on_success
end
def remove_channel(channel)
end
def invoke_callback(callback, message)
callback.call message
end
end
end
end