Send disconnect message during remote disconnect

Send {type: :disconnect} message to a client before closing the connection when initiated by server.remote_connections.where(...).disconnect
This commit is contained in:
Vladimir Dementyev 2020-06-05 16:26:30 +03:00
parent 4b5410a6aa
commit 2d0f9c5844
7 changed files with 64 additions and 11 deletions

@ -1,3 +1,6 @@
* `ActionCable.server.remote_connections.where(...).disconnect` now sends `disconnect` message
before closing the connection with the reconnection strategy specified (defaults to `true`).
*Vladimir Dementyev*
Please check [7-0-stable](https://github.com/rails/rails/blob/7-0-stable/actioncable/CHANGELOG.md) for previous changes.

@ -121,7 +121,8 @@
disconnect_reasons: {
unauthorized: "unauthorized",
invalid_request: "invalid_request",
server_restart: "server_restart"
server_restart: "server_restart",
remote: "remote"
},
default_mount_path: "/cable",
protocols: [ "actioncable-v1-json", "actioncable-unsupported" ]

@ -9,7 +9,8 @@ export default {
"disconnect_reasons": {
"unauthorized": "unauthorized",
"invalid_request": "invalid_request",
"server_restart": "server_restart"
"server_restart": "server_restart",
"remote": "remote"
},
"default_mount_path": "/cable",
"protocols": [

@ -56,7 +56,8 @@ module ActionCable
disconnect_reasons: {
unauthorized: "unauthorized",
invalid_request: "invalid_request",
server_restart: "server_restart"
server_restart: "server_restart",
remote: "remote"
},
default_mount_path: "/cable",
protocols: ["actioncable-v1-json", "actioncable-unsupported"].freeze

@ -32,7 +32,7 @@ def process_internal_message(message)
case message["type"]
when "disconnect"
logger.info "Removing connection (#{connection_identifier})"
websocket.close
close(reason: ActionCable::INTERNAL[:disconnect_reasons][:remote], reconnect: message.fetch("reconnect", true))
end
rescue Exception => e
logger.error "There was an exception - #{e.class}(#{e.message})"

@ -19,6 +19,11 @@ module ActionCable
# This will disconnect all the connections established for
# <tt>User.find(1)</tt>, across all servers running on all machines, because
# it uses the internal channel that all of these servers are subscribed to.
#
# By default, server sends a "disconnect" message with "reconnect" flag set to true.
# You can override it by specifying the `reconnect` option:
#
# ActionCable.server.remote_connections.where(current_user: User.find(1)).disconnect(reconnect: false)
class RemoteConnections
attr_reader :server
@ -44,8 +49,8 @@ def initialize(server, ids)
end
# Uses the internal channel to disconnect the connection.
def disconnect
server.broadcast internal_channel, { type: "disconnect" }
def disconnect(reconnect: true)
server.broadcast internal_channel, { type: "disconnect", reconnect: reconnect }
end
# Returns all the identifiers that were applied to this connection.

@ -30,6 +30,14 @@ class ClientTest < ActionCable::TestCase
WAIT_WHEN_EXPECTING_EVENT = 2
WAIT_WHEN_NOT_EXPECTING_EVENT = 0.5
class Connection < ActionCable::Connection::Base
identified_by :id
def connect
self.id = request.params["id"] || SecureRandom.hex(4)
end
end
class EchoChannel < ActionCable::Channel::Base
def subscribed
stream_from "global"
@ -59,6 +67,7 @@ def setup
server.config.logger = Logger.new(StringIO.new).tap { |l| l.level = Logger::UNKNOWN }
server.config.cable = ActiveSupport::HashWithIndifferentAccess.new(adapter: "async")
server.config.connection_class = -> { ClientTest::Connection }
# and now the "real" setup for our test:
server.config.disable_request_forgery_protection = true
@ -102,7 +111,7 @@ def with_puma_server(rack_app = ActionCable.server, port = 3099)
class SyncClient
attr_reader :pings
def initialize(port)
def initialize(port, path = "/")
messages = @messages = Queue.new
closed = @closed = Concurrent::Event.new
has_messages = @has_messages = Concurrent::Semaphore.new(0)
@ -110,7 +119,7 @@ def initialize(port)
open = Concurrent::Promise.new
@ws = WebSocket::Client::Simple.connect("ws://127.0.0.1:#{port}/") do |ws|
@ws = WebSocket::Client::Simple.connect("ws://127.0.0.1:#{port}#{path}") do |ws|
ws.on(:error) do |event|
event = RuntimeError.new(event.message) unless event.is_a?(Exception)
@ -196,8 +205,8 @@ def closed?
end
end
def websocket_client(port)
SyncClient.new(port)
def websocket_client(*args)
SyncClient.new(*args)
end
def concurrently(enum)
@ -284,7 +293,6 @@ def test_unsubscribe_client
c.send_message command: "subscribe", identifier: identifier
assert_equal({ "identifier" => "{\"channel\":\"ClientTest::EchoChannel\"}", "type" => "confirm_subscription" }, c.read_message)
assert_equal(1, app.connections.count)
assert(app.remote_connections.where(identifier: identifier))
subscriptions = app.connections.first.subscriptions.send(:subscriptions)
assert_not_equal 0, subscriptions.size, "Missing EchoChannel subscription"
@ -299,6 +307,40 @@ def test_unsubscribe_client
end
end
def test_remote_disconnect_client
with_puma_server do |port|
app = ActionCable.server
c = websocket_client(port, "/?id=1")
assert_equal({ "type" => "welcome" }, c.read_message)
sleep 0.1 # make sure connections is registered
app.remote_connections.where(id: "1").disconnect
assert_equal({ "type" => "disconnect", "reason" => "remote", "reconnect" => true }, c.read_message)
c.wait_for_close
assert(c.closed?)
end
end
def test_remote_disconnect_client_with_reconnect
with_puma_server do |port|
app = ActionCable.server
c = websocket_client(port, "/?id=2")
assert_equal({ "type" => "welcome" }, c.read_message)
sleep 0.1 # make sure connections is registered
app.remote_connections.where(id: "2").disconnect(reconnect: false)
assert_equal({ "type" => "disconnect", "reason" => "remote", "reconnect" => false }, c.read_message)
c.wait_for_close
assert(c.closed?)
end
end
def test_server_restart
with_puma_server do |port|
c = websocket_client(port)