aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/test/client_test.rb
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable/test/client_test.rb')
-rw-r--r--actioncable/test/client_test.rb102
1 files changed, 77 insertions, 25 deletions
diff --git a/actioncable/test/client_test.rb b/actioncable/test/client_test.rb
index d30c381131..fe503fd703 100644
--- a/actioncable/test/client_test.rb
+++ b/actioncable/test/client_test.rb
@@ -8,8 +8,8 @@ require 'faye/websocket'
require 'json'
class ClientTest < ActionCable::TestCase
- WAIT_WHEN_EXPECTING_EVENT = 3
- WAIT_WHEN_NOT_EXPECTING_EVENT = 0.2
+ WAIT_WHEN_EXPECTING_EVENT = 8
+ WAIT_WHEN_NOT_EXPECTING_EVENT = 0.5
def setup
ActionCable.instance_variable_set(:@server, nil)
@@ -17,6 +17,7 @@ class ClientTest < ActionCable::TestCase
server.config.logger = Logger.new(StringIO.new).tap { |l| l.level = Logger::UNKNOWN }
server.config.cable = { adapter: 'async' }.with_indifferent_access
+ server.config.use_faye = ENV['FAYE'].present?
# and now the "real" setup for our test:
server.config.disable_request_forgery_protection = true
@@ -54,7 +55,7 @@ class ClientTest < ActionCable::TestCase
@ws = Faye::WebSocket::Client.new("ws://127.0.0.1:#{port}/")
@messages = Queue.new
@closed = Concurrent::Event.new
- @has_messages = Concurrent::Event.new
+ @has_messages = Concurrent::Semaphore.new(0)
@pings = 0
open = Concurrent::Event.new
@@ -74,12 +75,12 @@ class ClientTest < ActionCable::TestCase
end
@ws.on(:message) do |event|
- hash = JSON.parse(event.data)
- if hash['identifier'] == '_ping'
+ message = JSON.parse(event.data)
+ if message['type'] == 'ping'
@pings += 1
else
- @messages << hash
- @has_messages.set
+ @messages << message
+ @has_messages.release
end
end
@@ -92,8 +93,7 @@ class ClientTest < ActionCable::TestCase
end
def read_message
- @has_messages.wait(WAIT_WHEN_EXPECTING_EVENT) if @messages.empty?
- @has_messages.reset if @messages.size < 2
+ @has_messages.try_acquire(1, WAIT_WHEN_EXPECTING_EVENT)
msg = @messages.pop(true)
raise msg if msg.is_a?(Exception)
@@ -104,9 +104,11 @@ class ClientTest < ActionCable::TestCase
def read_messages(expected_size = 0)
list = []
loop do
- @has_messages.wait(list.size < expected_size ? WAIT_WHEN_EXPECTING_EVENT : WAIT_WHEN_NOT_EXPECTING_EVENT)
- if @has_messages.set?
- list << read_message
+ if @has_messages.try_acquire(1, list.size < expected_size ? WAIT_WHEN_EXPECTING_EVENT : WAIT_WHEN_NOT_EXPECTING_EVENT)
+ msg = @messages.pop(true)
+ raise msg if msg.is_a?(Exception)
+
+ list << msg
else
break
end
@@ -114,8 +116,8 @@ class ClientTest < ActionCable::TestCase
list
end
- def send_message(hash)
- @ws.send(JSON.dump(hash))
+ def send_message(message)
+ @ws.send(JSON.generate(message))
end
def close
@@ -126,8 +128,16 @@ class ClientTest < ActionCable::TestCase
end
@ws.close
+ wait_for_close
+ end
+
+ def wait_for_close
@closed.wait(WAIT_WHEN_EXPECTING_EVENT)
end
+
+ def closed?
+ @closed.set?
+ end
end
def faye_client(port)
@@ -137,9 +147,10 @@ class ClientTest < ActionCable::TestCase
def test_single_client
with_puma_server do |port|
c = faye_client(port)
- c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
+ assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack
+ c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel')
assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
- c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello')
+ c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'ding', message: 'hello')
assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "message"=>{"dong"=>"hello"}}, c.read_message)
c.close
end
@@ -153,12 +164,13 @@ class ClientTest < ActionCable::TestCase
barrier_2 = Concurrent::CyclicBarrier.new(clients.size)
clients.map {|c| Concurrent::Future.execute {
- c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
+ assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack
+ c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel')
assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "type"=>"confirm_subscription"}, c.read_message)
- c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello')
+ c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'ding', message: 'hello')
assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message)
barrier_1.wait WAIT_WHEN_EXPECTING_EVENT
- c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'bulk', message: 'hello')
+ c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'bulk', message: 'hello')
barrier_2.wait WAIT_WHEN_EXPECTING_EVENT
assert_equal clients.size, c.read_messages(clients.size).size
} }.each(&:wait!)
@@ -172,9 +184,10 @@ class ClientTest < ActionCable::TestCase
clients = 100.times.map { faye_client(port) }
clients.map {|c| Concurrent::Future.execute {
- c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
+ assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack
+ c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel')
assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "type"=>"confirm_subscription"}, c.read_message)
- c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello')
+ c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'ding', message: 'hello')
assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message)
} }.each(&:wait!)
@@ -185,17 +198,56 @@ class ClientTest < ActionCable::TestCase
def test_disappearing_client
with_puma_server do |port|
c = faye_client(port)
- c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
+ assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack
+ c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel')
assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
- c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'delay', message: 'hello')
+ c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'delay', message: 'hello')
c.close # disappear before write
c = faye_client(port)
- c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
+ assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack
+ c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel')
assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
- c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello')
+ c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'ding', message: 'hello')
assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message)
c.close # disappear before read
end
end
+
+ def test_unsubscribe_client
+ with_puma_server do |port|
+ app = ActionCable.server
+ identifier = JSON.generate(channel: 'EchoChannel')
+
+ c = faye_client(port)
+ assert_equal({"type" => "welcome"}, c.read_message)
+ c.send_message command: 'subscribe', identifier: identifier
+ assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
+ assert_equal(1, app.connections.count)
+ assert(app.remote_connections.where(identifier: identifier))
+
+ subscriptions = app.connections.first.subscriptions.send(:subscriptions)
+ assert_not_equal 0, subscriptions.size, 'Missing EchoChannel subscription'
+ channel = subscriptions.first[1]
+ channel.expects(:unsubscribed)
+ c.close
+ sleep 0.1 # Data takes a moment to process
+
+ # All data is removed: No more connection or subscription information!
+ assert_equal(0, app.connections.count)
+ end
+ end
+
+ def test_server_restart
+ with_puma_server do |port|
+ c = faye_client(port)
+ assert_equal({"type" => "welcome"}, c.read_message)
+ c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel')
+ assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
+
+ ActionCable.server.restart
+ c.wait_for_close
+ assert c.closed?
+ end
+ end
end