Allow rejecting subscriptions from the channel
This commit is contained in:
parent
904b83b9c3
commit
0ce0cf0c04
@ -74,11 +74,12 @@ class Base
|
||||
include Broadcasting
|
||||
|
||||
SUBSCRIPTION_CONFIRMATION_INTERNAL_MESSAGE = 'confirm_subscription'.freeze
|
||||
SUBSCRIPTION_REJECTION_INTERNAL_MESSAGE = 'reject_subscription'.freeze
|
||||
|
||||
on_subscribe :subscribed
|
||||
on_unsubscribe :unsubscribed
|
||||
|
||||
attr_reader :params, :connection
|
||||
attr_reader :params, :connection, :identifier
|
||||
delegate :logger, to: :connection
|
||||
|
||||
class << self
|
||||
@ -170,8 +171,6 @@ def transmit(data, via: nil)
|
||||
connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, message: data)
|
||||
end
|
||||
|
||||
|
||||
protected
|
||||
def defer_subscription_confirmation!
|
||||
@defer_subscription_confirmation = true
|
||||
end
|
||||
@ -184,6 +183,14 @@ def subscription_confirmation_sent?
|
||||
@subscription_confirmation_sent
|
||||
end
|
||||
|
||||
def reject!
|
||||
@reject_subscription = true
|
||||
end
|
||||
|
||||
def subscription_rejected?
|
||||
@reject_subscription
|
||||
end
|
||||
|
||||
private
|
||||
def delegate_connection_identifiers
|
||||
connection.identifiers.each do |identifier|
|
||||
@ -196,7 +203,12 @@ def delegate_connection_identifiers
|
||||
|
||||
def subscribe_to_channel
|
||||
run_subscribe_callbacks
|
||||
transmit_subscription_confirmation unless defer_subscription_confirmation?
|
||||
|
||||
if subscription_rejected?
|
||||
reject_subscription
|
||||
else
|
||||
transmit_subscription_confirmation unless defer_subscription_confirmation?
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
@ -243,6 +255,16 @@ def transmit_subscription_confirmation
|
||||
end
|
||||
end
|
||||
|
||||
def reject_subscription
|
||||
connection.subscriptions.remove_subscription self
|
||||
transmit_subscription_rejection
|
||||
end
|
||||
|
||||
def transmit_subscription_rejection
|
||||
logger.info "#{self.class.name} is transmitting the subscription rejection"
|
||||
connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: SUBSCRIPTION_REJECTION_INTERNAL_MESSAGE)
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -48,7 +48,7 @@ class Base
|
||||
include InternalChannel
|
||||
include Authorization
|
||||
|
||||
attr_reader :server, :env
|
||||
attr_reader :server, :env, :subscriptions
|
||||
delegate :worker_pool, :pubsub, to: :server
|
||||
|
||||
attr_reader :logger
|
||||
@ -140,7 +140,7 @@ def cookies
|
||||
|
||||
private
|
||||
attr_reader :websocket
|
||||
attr_reader :subscriptions, :message_buffer
|
||||
attr_reader :message_buffer
|
||||
|
||||
def on_open
|
||||
connect if respond_to?(:connect)
|
||||
|
@ -37,8 +37,12 @@ def add(data)
|
||||
|
||||
def remove(data)
|
||||
logger.info "Unsubscribing from channel: #{data['identifier']}"
|
||||
subscriptions[data['identifier']].unsubscribe_from_channel
|
||||
subscriptions.delete(data['identifier'])
|
||||
remove_subscription subscriptions[data['identifier']]
|
||||
end
|
||||
|
||||
def remove_subscription(subscription)
|
||||
subscription.unsubscribe_from_channel
|
||||
subscriptions.delete(subscription.identifier)
|
||||
end
|
||||
|
||||
def perform_action(data)
|
||||
|
@ -5,6 +5,7 @@
|
||||
PING_IDENTIFIER: "_ping"
|
||||
INTERNAL_MESSAGES:
|
||||
SUBSCRIPTION_CONFIRMATION: 'confirm_subscription'
|
||||
SUBSCRIPTION_REJECTION: 'reject_subscription'
|
||||
|
||||
createConsumer: (url) ->
|
||||
new Cable.Consumer url
|
||||
|
@ -58,6 +58,8 @@ class Cable.Connection
|
||||
switch type
|
||||
when Cable.INTERNAL_MESSAGES.SUBSCRIPTION_CONFIRMATION
|
||||
@consumer.subscriptions.notify(identifier, "connected")
|
||||
when Cable.INTERNAL_MESSAGES.SUBSCRIPTION_REJECTION
|
||||
@consumer.subscriptions.rejectSubscription(identifier)
|
||||
else
|
||||
@consumer.subscriptions.notify(identifier, "received", message)
|
||||
|
||||
|
@ -27,11 +27,22 @@ class Cable.Subscriptions
|
||||
for subscription in @subscriptions
|
||||
@sendCommand(subscription, "subscribe")
|
||||
|
||||
rejectSubscription: (identifier) ->
|
||||
subscriptions = @findAll(identifier)
|
||||
|
||||
for subscription in subscriptions
|
||||
@removeSubscription(subscription)
|
||||
@notify(subscription, "rejected")
|
||||
|
||||
remove: (subscription) ->
|
||||
@subscriptions = (s for s in @subscriptions when s isnt subscription)
|
||||
@removeSubscription(subscription)
|
||||
|
||||
unless @findAll(subscription.identifier).length
|
||||
@sendCommand(subscription, "unsubscribe")
|
||||
|
||||
removeSubscription: (subscription) ->
|
||||
@subscriptions = (s for s in @subscriptions when s isnt subscription)
|
||||
|
||||
findAll: (identifier) ->
|
||||
s for s in @subscriptions when s.identifier is identifier
|
||||
|
||||
@ -48,7 +59,7 @@ class Cable.Subscriptions
|
||||
for subscription in subscriptions
|
||||
subscription[callbackName]?(args...)
|
||||
|
||||
if callbackName in ["initialized", "connected", "disconnected"]
|
||||
if callbackName in ["initialized", "connected", "disconnected", "rejected"]
|
||||
{identifier} = subscription
|
||||
@record(notification: {identifier, callbackName, args})
|
||||
|
||||
|
25
test/channel/rejection_test.rb
Normal file
25
test/channel/rejection_test.rb
Normal file
@ -0,0 +1,25 @@
|
||||
require 'test_helper'
|
||||
require 'stubs/test_connection'
|
||||
require 'stubs/room'
|
||||
|
||||
class ActionCable::Channel::RejectionTest < ActiveSupport::TestCase
|
||||
class SecretChannel < ActionCable::Channel::Base
|
||||
def subscribed
|
||||
reject! if params[:id] > 0
|
||||
end
|
||||
end
|
||||
|
||||
setup do
|
||||
@user = User.new "lifo"
|
||||
@connection = TestConnection.new(@user)
|
||||
end
|
||||
|
||||
test "subscription rejection" do
|
||||
@connection.expects(:subscriptions).returns mock().tap { |m| m.expects(:remove_subscription).with instance_of(SecretChannel) }
|
||||
@channel = SecretChannel.new @connection, "{id: 1}", { id: 1 }
|
||||
|
||||
expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "reject_subscription"
|
||||
assert_equal expected, @connection.last_transmission
|
||||
end
|
||||
|
||||
end
|
Loading…
Reference in New Issue
Block a user