aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/test/client_test.rb
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable/test/client_test.rb')
-rw-r--r--actioncable/test/client_test.rb221
1 files changed, 118 insertions, 103 deletions
diff --git a/actioncable/test/client_test.rb b/actioncable/test/client_test.rb
index f4b4a53aea..db10a7ad16 100644
--- a/actioncable/test/client_test.rb
+++ b/actioncable/test/client_test.rb
@@ -1,14 +1,31 @@
-require 'test_helper'
-require 'concurrent'
+require "test_helper"
+require "concurrent"
-require 'active_support/core_ext/hash/indifferent_access'
-require 'pathname'
+require "websocket-client-simple"
+require "json"
-require 'faye/websocket'
-require 'json'
+require "active_support/hash_with_indifferent_access"
+
+####
+# 😷 Warning suppression 😷
+WebSocket::Frame::Handler::Handler03.prepend Module.new {
+ def initialize(*)
+ @application_data_buffer = nil
+ super
+ end
+}
+
+WebSocket::Frame::Data.prepend Module.new {
+ def initialize(*)
+ @masking_key = nil
+ super
+ end
+}
+#
+####
class ClientTest < ActionCable::TestCase
- WAIT_WHEN_EXPECTING_EVENT = 8
+ WAIT_WHEN_EXPECTING_EVENT = 2
WAIT_WHEN_NOT_EXPECTING_EVENT = 0.5
class EchoChannel < ActionCable::Channel::Base
@@ -17,20 +34,20 @@ class ClientTest < ActionCable::TestCase
end
def unsubscribed
- 'Goodbye from EchoChannel!'
+ "Goodbye from EchoChannel!"
end
def ding(data)
- transmit(dong: data['message'])
+ transmit(dong: data["message"])
end
def delay(data)
sleep 1
- transmit(dong: data['message'])
+ transmit(dong: data["message"])
end
def bulk(data)
- ActionCable.server.broadcast "global", wide: data['message']
+ ActionCable.server.broadcast "global", wide: data["message"]
end
end
@@ -39,26 +56,15 @@ class ClientTest < ActionCable::TestCase
server = ActionCable.server
server.config.logger = Logger.new(StringIO.new).tap { |l| l.level = Logger::UNKNOWN }
- server.config.cable = { adapter: 'async' }.with_indifferent_access
- server.config.use_faye = ENV['FAYE'].present?
+ server.config.cable = ActiveSupport::HashWithIndifferentAccess.new(adapter: "async")
# and now the "real" setup for our test:
server.config.disable_request_forgery_protection = true
-
- Thread.new { EventMachine.run } unless EventMachine.reactor_running?
- Thread.pass until EventMachine.reactor_running?
-
- # faye-websocket is warning-rich
- @previous_verbose, $VERBOSE = $VERBOSE, nil
- end
-
- def teardown
- $VERBOSE = @previous_verbose
end
def with_puma_server(rack_app = ActionCable.server, port = 3099)
server = ::Puma::Server.new(rack_app, ::Puma::Events.strings)
- server.add_tcp_listener '127.0.0.1', port
+ server.add_tcp_listener "127.0.0.1", port
server.min_threads = 1
server.max_threads = 4
@@ -74,44 +80,49 @@ class ClientTest < ActionCable::TestCase
attr_reader :pings
def initialize(port)
- @ws = Faye::WebSocket::Client.new("ws://127.0.0.1:#{port}/")
- @messages = Queue.new
- @closed = Concurrent::Event.new
- @has_messages = Concurrent::Semaphore.new(0)
- @pings = 0
-
- open = Concurrent::Event.new
- error = nil
-
- @ws.on(:error) do |event|
- if open.set?
- @messages << RuntimeError.new(event.message)
- else
- error = event.message
- open.set
+ messages = @messages = Queue.new
+ closed = @closed = Concurrent::Event.new
+ has_messages = @has_messages = Concurrent::Semaphore.new(0)
+ pings = @pings = Concurrent::AtomicFixnum.new(0)
+
+ open = Concurrent::Promise.new
+
+ @ws = WebSocket::Client::Simple.connect("ws://127.0.0.1:#{port}/") do |ws|
+ ws.on(:error) do |event|
+ event = RuntimeError.new(event.message) unless event.is_a?(Exception)
+
+ if open.pending?
+ open.fail(event)
+ else
+ messages << event
+ has_messages.release
+ end
end
- end
- @ws.on(:open) do |event|
- open.set
- end
+ ws.on(:open) do |event|
+ open.set(true)
+ end
- @ws.on(:message) do |event|
- message = JSON.parse(event.data)
- if message['type'] == 'ping'
- @pings += 1
- else
- @messages << message
- @has_messages.release
+ ws.on(:message) do |event|
+ if event.type == :close
+ closed.set
+ else
+ message = JSON.parse(event.data)
+ if message["type"] == "ping"
+ pings.increment
+ else
+ messages << message
+ has_messages.release
+ end
+ end
end
- end
- @ws.on(:close) do |event|
- @closed.set
+ ws.on(:close) do |event|
+ closed.set
+ end
end
- open.wait(WAIT_WHEN_EXPECTING_EVENT)
- raise error if error
+ open.wait!(WAIT_WHEN_EXPECTING_EVENT)
end
def read_message
@@ -162,76 +173,80 @@ class ClientTest < ActionCable::TestCase
end
end
- def faye_client(port)
+ def websocket_client(port)
SyncClient.new(port)
end
+ def concurrently(enum)
+ enum.map { |*x| Concurrent::Future.execute { yield(*x) } }.map(&:value!)
+ end
+
def test_single_client
with_puma_server do |port|
- c = faye_client(port)
- assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack
- c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'ClientTest::EchoChannel')
- assert_equal({"identifier"=>"{\"channel\":\"ClientTest::EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
- c.send_message command: 'message', identifier: JSON.generate(channel: 'ClientTest::EchoChannel'), data: JSON.generate(action: 'ding', message: 'hello')
- assert_equal({"identifier"=>"{\"channel\":\"ClientTest::EchoChannel\"}", "message"=>{"dong"=>"hello"}}, c.read_message)
+ c = websocket_client(port)
+ assert_equal({ "type" => "welcome" }, c.read_message) # pop the first welcome message off the stack
+ c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel")
+ assert_equal({ "identifier"=>"{\"channel\":\"ClientTest::EchoChannel\"}", "type"=>"confirm_subscription" }, c.read_message)
+ c.send_message command: "message", identifier: JSON.generate(channel: "ClientTest::EchoChannel"), data: JSON.generate(action: "ding", message: "hello")
+ assert_equal({ "identifier"=>"{\"channel\":\"ClientTest::EchoChannel\"}", "message"=>{ "dong"=>"hello" } }, c.read_message)
c.close
end
end
def test_interacting_clients
with_puma_server do |port|
- clients = 10.times.map { faye_client(port) }
+ clients = concurrently(10.times) { websocket_client(port) }
barrier_1 = Concurrent::CyclicBarrier.new(clients.size)
barrier_2 = Concurrent::CyclicBarrier.new(clients.size)
- clients.map {|c| Concurrent::Future.execute {
- assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack
- c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'ClientTest::EchoChannel')
- assert_equal({"identifier"=>'{"channel":"ClientTest::EchoChannel"}', "type"=>"confirm_subscription"}, c.read_message)
- c.send_message command: 'message', identifier: JSON.generate(channel: 'ClientTest::EchoChannel'), data: JSON.generate(action: 'ding', message: 'hello')
- assert_equal({"identifier"=>'{"channel":"ClientTest::EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message)
+ concurrently(clients) do |c|
+ assert_equal({ "type" => "welcome" }, c.read_message) # pop the first welcome message off the stack
+ c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel")
+ assert_equal({ "identifier"=>'{"channel":"ClientTest::EchoChannel"}', "type"=>"confirm_subscription" }, c.read_message)
+ c.send_message command: "message", identifier: JSON.generate(channel: "ClientTest::EchoChannel"), data: JSON.generate(action: "ding", message: "hello")
+ assert_equal({ "identifier"=>'{"channel":"ClientTest::EchoChannel"}', "message"=>{ "dong"=>"hello" } }, c.read_message)
barrier_1.wait WAIT_WHEN_EXPECTING_EVENT
- c.send_message command: 'message', identifier: JSON.generate(channel: 'ClientTest::EchoChannel'), data: JSON.generate(action: 'bulk', message: 'hello')
+ c.send_message command: "message", identifier: JSON.generate(channel: "ClientTest::EchoChannel"), data: JSON.generate(action: "bulk", message: "hello")
barrier_2.wait WAIT_WHEN_EXPECTING_EVENT
assert_equal clients.size, c.read_messages(clients.size).size
- } }.each(&:wait!)
+ end
- clients.map {|c| Concurrent::Future.execute { c.close } }.each(&:wait!)
+ concurrently(clients, &:close)
end
end
def test_many_clients
with_puma_server do |port|
- clients = 100.times.map { faye_client(port) }
-
- clients.map {|c| Concurrent::Future.execute {
- assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack
- c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'ClientTest::EchoChannel')
- assert_equal({"identifier"=>'{"channel":"ClientTest::EchoChannel"}', "type"=>"confirm_subscription"}, c.read_message)
- c.send_message command: 'message', identifier: JSON.generate(channel: 'ClientTest::EchoChannel'), data: JSON.generate(action: 'ding', message: 'hello')
- assert_equal({"identifier"=>'{"channel":"ClientTest::EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message)
- } }.each(&:wait!)
+ clients = concurrently(100.times) { websocket_client(port) }
+
+ concurrently(clients) do |c|
+ assert_equal({ "type" => "welcome" }, c.read_message) # pop the first welcome message off the stack
+ c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel")
+ assert_equal({ "identifier"=>'{"channel":"ClientTest::EchoChannel"}', "type"=>"confirm_subscription" }, c.read_message)
+ c.send_message command: "message", identifier: JSON.generate(channel: "ClientTest::EchoChannel"), data: JSON.generate(action: "ding", message: "hello")
+ assert_equal({ "identifier"=>'{"channel":"ClientTest::EchoChannel"}', "message"=>{ "dong"=>"hello" } }, c.read_message)
+ end
- clients.map {|c| Concurrent::Future.execute { c.close } }.each(&:wait!)
+ concurrently(clients, &:close)
end
end
def test_disappearing_client
with_puma_server do |port|
- c = faye_client(port)
- assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack
- c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'ClientTest::EchoChannel')
- assert_equal({"identifier"=>"{\"channel\":\"ClientTest::EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
- c.send_message command: 'message', identifier: JSON.generate(channel: 'ClientTest::EchoChannel'), data: JSON.generate(action: 'delay', message: 'hello')
+ c = websocket_client(port)
+ assert_equal({ "type" => "welcome" }, c.read_message) # pop the first welcome message off the stack
+ c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel")
+ assert_equal({ "identifier"=>"{\"channel\":\"ClientTest::EchoChannel\"}", "type"=>"confirm_subscription" }, c.read_message)
+ c.send_message command: "message", identifier: JSON.generate(channel: "ClientTest::EchoChannel"), data: JSON.generate(action: "delay", message: "hello")
c.close # disappear before write
- c = faye_client(port)
- assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack
- c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'ClientTest::EchoChannel')
- assert_equal({"identifier"=>"{\"channel\":\"ClientTest::EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
- c.send_message command: 'message', identifier: JSON.generate(channel: 'ClientTest::EchoChannel'), data: JSON.generate(action: 'ding', message: 'hello')
- assert_equal({"identifier"=>'{"channel":"ClientTest::EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message)
+ c = websocket_client(port)
+ assert_equal({ "type" => "welcome" }, c.read_message) # pop the first welcome message off the stack
+ c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel")
+ assert_equal({ "identifier"=>"{\"channel\":\"ClientTest::EchoChannel\"}", "type"=>"confirm_subscription" }, c.read_message)
+ c.send_message command: "message", identifier: JSON.generate(channel: "ClientTest::EchoChannel"), data: JSON.generate(action: "ding", message: "hello")
+ assert_equal({ "identifier"=>'{"channel":"ClientTest::EchoChannel"}', "message"=>{ "dong"=>"hello" } }, c.read_message)
c.close # disappear before read
end
end
@@ -239,17 +254,17 @@ class ClientTest < ActionCable::TestCase
def test_unsubscribe_client
with_puma_server do |port|
app = ActionCable.server
- identifier = JSON.generate(channel: 'ClientTest::EchoChannel')
+ identifier = JSON.generate(channel: "ClientTest::EchoChannel")
- c = faye_client(port)
- assert_equal({"type" => "welcome"}, c.read_message)
- c.send_message command: 'subscribe', identifier: identifier
- assert_equal({"identifier"=>"{\"channel\":\"ClientTest::EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
+ c = websocket_client(port)
+ assert_equal({ "type" => "welcome" }, c.read_message)
+ c.send_message command: "subscribe", identifier: identifier
+ assert_equal({ "identifier"=>"{\"channel\":\"ClientTest::EchoChannel\"}", "type"=>"confirm_subscription" }, c.read_message)
assert_equal(1, app.connections.count)
assert(app.remote_connections.where(identifier: identifier))
subscriptions = app.connections.first.subscriptions.send(:subscriptions)
- assert_not_equal 0, subscriptions.size, 'Missing EchoChannel subscription'
+ assert_not_equal 0, subscriptions.size, "Missing EchoChannel subscription"
channel = subscriptions.first[1]
channel.expects(:unsubscribed)
c.close
@@ -262,10 +277,10 @@ class ClientTest < ActionCable::TestCase
def test_server_restart
with_puma_server do |port|
- c = faye_client(port)
- assert_equal({"type" => "welcome"}, c.read_message)
- c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'ClientTest::EchoChannel')
- assert_equal({"identifier"=>"{\"channel\":\"ClientTest::EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
+ c = websocket_client(port)
+ assert_equal({ "type" => "welcome" }, c.read_message)
+ c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel")
+ assert_equal({ "identifier"=>"{\"channel\":\"ClientTest::EchoChannel\"}", "type"=>"confirm_subscription" }, c.read_message)
ActionCable.server.restart
c.wait_for_close