diff options
Diffstat (limited to 'actioncable/test/client_test.rb')
-rw-r--r-- | actioncable/test/client_test.rb | 270 |
1 files changed, 165 insertions, 105 deletions
diff --git a/actioncable/test/client_test.rb b/actioncable/test/client_test.rb index fe503fd703..56b3ef143b 100644 --- a/actioncable/test/client_test.rb +++ b/actioncable/test/client_test.rb @@ -1,95 +1,151 @@ -require 'test_helper' -require 'concurrent' +# frozen_string_literal: true -require 'active_support/core_ext/hash/indifferent_access' -require 'pathname' +require "test_helper" +require "concurrent" -require 'faye/websocket' -require 'json' +require "websocket-client-simple" +require "json" + +require "active_support/hash_with_indifferent_access" + +#### +# 😷 Warning suppression 😷 +WebSocket::Frame::Handler::Handler03.prepend Module.new { + def initialize(*) + @application_data_buffer = nil + super + end +} + +WebSocket::Frame::Data.prepend Module.new { + def initialize(*) + @masking_key = nil + super + end +} +# +#### class ClientTest < ActionCable::TestCase - WAIT_WHEN_EXPECTING_EVENT = 8 + WAIT_WHEN_EXPECTING_EVENT = 2 WAIT_WHEN_NOT_EXPECTING_EVENT = 0.5 + class EchoChannel < ActionCable::Channel::Base + def subscribed + stream_from "global" + end + + def unsubscribed + "Goodbye from EchoChannel!" + end + + def ding(data) + transmit(dong: data["message"]) + end + + def delay(data) + sleep 1 + transmit(dong: data["message"]) + end + + def bulk(data) + ActionCable.server.broadcast "global", wide: data["message"] + end + end + def setup ActionCable.instance_variable_set(:@server, nil) server = ActionCable.server server.config.logger = Logger.new(StringIO.new).tap { |l| l.level = Logger::UNKNOWN } - server.config.cable = { adapter: 'async' }.with_indifferent_access - server.config.use_faye = ENV['FAYE'].present? + server.config.cable = ActiveSupport::HashWithIndifferentAccess.new(adapter: "async") # and now the "real" setup for our test: server.config.disable_request_forgery_protection = true - server.config.channel_paths = [ File.expand_path('client/echo_channel.rb', __dir__) ] - - Thread.new { EventMachine.run } unless EventMachine.reactor_running? - Thread.pass until EventMachine.reactor_running? - - # faye-websocket is warning-rich - @previous_verbose, $VERBOSE = $VERBOSE, nil - end - - def teardown - $VERBOSE = @previous_verbose end def with_puma_server(rack_app = ActionCable.server, port = 3099) server = ::Puma::Server.new(rack_app, ::Puma::Events.strings) - server.add_tcp_listener '127.0.0.1', port + server.add_tcp_listener "127.0.0.1", port server.min_threads = 1 server.max_threads = 4 - t = Thread.new { server.run.join } - yield port + thread = server.run - ensure - server.stop(true) if server - t.join if t + begin + yield port + + ensure + server.stop + + begin + thread.join + + rescue IOError + # Work around https://bugs.ruby-lang.org/issues/13405 + # + # Puma's sometimes raising while shutting down, when it closes + # its internal pipe. We can safely ignore that, but we do need + # to do the step skipped by the exception: + server.binder.close + + rescue RuntimeError => ex + # Work around https://bugs.ruby-lang.org/issues/13239 + raise unless ex.message =~ /can't modify frozen IOError/ + + # Handle this as if it were the IOError: do the same as above. + server.binder.close + end + end end class SyncClient attr_reader :pings def initialize(port) - @ws = Faye::WebSocket::Client.new("ws://127.0.0.1:#{port}/") - @messages = Queue.new - @closed = Concurrent::Event.new - @has_messages = Concurrent::Semaphore.new(0) - @pings = 0 - - open = Concurrent::Event.new - error = nil - - @ws.on(:error) do |event| - if open.set? - @messages << RuntimeError.new(event.message) - else - error = event.message - open.set + messages = @messages = Queue.new + closed = @closed = Concurrent::Event.new + has_messages = @has_messages = Concurrent::Semaphore.new(0) + pings = @pings = Concurrent::AtomicFixnum.new(0) + + open = Concurrent::Promise.new + + @ws = WebSocket::Client::Simple.connect("ws://127.0.0.1:#{port}/") do |ws| + ws.on(:error) do |event| + event = RuntimeError.new(event.message) unless event.is_a?(Exception) + + if open.pending? + open.fail(event) + else + messages << event + has_messages.release + end end - end - @ws.on(:open) do |event| - open.set - end + ws.on(:open) do |event| + open.set(true) + end - @ws.on(:message) do |event| - message = JSON.parse(event.data) - if message['type'] == 'ping' - @pings += 1 - else - @messages << message - @has_messages.release + ws.on(:message) do |event| + if event.type == :close + closed.set + else + message = JSON.parse(event.data) + if message["type"] == "ping" + pings.increment + else + messages << message + has_messages.release + end + end end - end - @ws.on(:close) do |event| - @closed.set + ws.on(:close) do |event| + closed.set + end end - open.wait(WAIT_WHEN_EXPECTING_EVENT) - raise error if error + open.wait!(WAIT_WHEN_EXPECTING_EVENT) end def read_message @@ -140,76 +196,80 @@ class ClientTest < ActionCable::TestCase end end - def faye_client(port) + def websocket_client(port) SyncClient.new(port) end + def concurrently(enum) + enum.map { |*x| Concurrent::Future.execute { yield(*x) } }.map(&:value!) + end + def test_single_client with_puma_server do |port| - c = faye_client(port) - assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack - c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel') - assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message) - c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'ding', message: 'hello') - assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "message"=>{"dong"=>"hello"}}, c.read_message) + c = websocket_client(port) + assert_equal({ "type" => "welcome" }, c.read_message) # pop the first welcome message off the stack + c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel") + assert_equal({ "identifier" => "{\"channel\":\"ClientTest::EchoChannel\"}", "type" => "confirm_subscription" }, c.read_message) + c.send_message command: "message", identifier: JSON.generate(channel: "ClientTest::EchoChannel"), data: JSON.generate(action: "ding", message: "hello") + assert_equal({ "identifier" => "{\"channel\":\"ClientTest::EchoChannel\"}", "message" => { "dong" => "hello" } }, c.read_message) c.close end end def test_interacting_clients with_puma_server do |port| - clients = 10.times.map { faye_client(port) } + clients = concurrently(10.times) { websocket_client(port) } barrier_1 = Concurrent::CyclicBarrier.new(clients.size) barrier_2 = Concurrent::CyclicBarrier.new(clients.size) - clients.map {|c| Concurrent::Future.execute { - assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack - c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel') - assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "type"=>"confirm_subscription"}, c.read_message) - c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'ding', message: 'hello') - assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message) + concurrently(clients) do |c| + assert_equal({ "type" => "welcome" }, c.read_message) # pop the first welcome message off the stack + c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel") + assert_equal({ "identifier" => '{"channel":"ClientTest::EchoChannel"}', "type" => "confirm_subscription" }, c.read_message) + c.send_message command: "message", identifier: JSON.generate(channel: "ClientTest::EchoChannel"), data: JSON.generate(action: "ding", message: "hello") + assert_equal({ "identifier" => '{"channel":"ClientTest::EchoChannel"}', "message" => { "dong" => "hello" } }, c.read_message) barrier_1.wait WAIT_WHEN_EXPECTING_EVENT - c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'bulk', message: 'hello') + c.send_message command: "message", identifier: JSON.generate(channel: "ClientTest::EchoChannel"), data: JSON.generate(action: "bulk", message: "hello") barrier_2.wait WAIT_WHEN_EXPECTING_EVENT assert_equal clients.size, c.read_messages(clients.size).size - } }.each(&:wait!) + end - clients.map {|c| Concurrent::Future.execute { c.close } }.each(&:wait!) + concurrently(clients, &:close) end end def test_many_clients with_puma_server do |port| - clients = 100.times.map { faye_client(port) } - - clients.map {|c| Concurrent::Future.execute { - assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack - c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel') - assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "type"=>"confirm_subscription"}, c.read_message) - c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'ding', message: 'hello') - assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message) - } }.each(&:wait!) + clients = concurrently(100.times) { websocket_client(port) } + + concurrently(clients) do |c| + assert_equal({ "type" => "welcome" }, c.read_message) # pop the first welcome message off the stack + c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel") + assert_equal({ "identifier" => '{"channel":"ClientTest::EchoChannel"}', "type" => "confirm_subscription" }, c.read_message) + c.send_message command: "message", identifier: JSON.generate(channel: "ClientTest::EchoChannel"), data: JSON.generate(action: "ding", message: "hello") + assert_equal({ "identifier" => '{"channel":"ClientTest::EchoChannel"}', "message" => { "dong" => "hello" } }, c.read_message) + end - clients.map {|c| Concurrent::Future.execute { c.close } }.each(&:wait!) + concurrently(clients, &:close) end end def test_disappearing_client with_puma_server do |port| - c = faye_client(port) - assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack - c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel') - assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message) - c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'delay', message: 'hello') + c = websocket_client(port) + assert_equal({ "type" => "welcome" }, c.read_message) # pop the first welcome message off the stack + c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel") + assert_equal({ "identifier" => "{\"channel\":\"ClientTest::EchoChannel\"}", "type" => "confirm_subscription" }, c.read_message) + c.send_message command: "message", identifier: JSON.generate(channel: "ClientTest::EchoChannel"), data: JSON.generate(action: "delay", message: "hello") c.close # disappear before write - c = faye_client(port) - assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack - c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel') - assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message) - c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'ding', message: 'hello') - assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message) + c = websocket_client(port) + assert_equal({ "type" => "welcome" }, c.read_message) # pop the first welcome message off the stack + c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel") + assert_equal({ "identifier" => "{\"channel\":\"ClientTest::EchoChannel\"}", "type" => "confirm_subscription" }, c.read_message) + c.send_message command: "message", identifier: JSON.generate(channel: "ClientTest::EchoChannel"), data: JSON.generate(action: "ding", message: "hello") + assert_equal({ "identifier" => '{"channel":"ClientTest::EchoChannel"}', "message" => { "dong" => "hello" } }, c.read_message) c.close # disappear before read end end @@ -217,17 +277,17 @@ class ClientTest < ActionCable::TestCase def test_unsubscribe_client with_puma_server do |port| app = ActionCable.server - identifier = JSON.generate(channel: 'EchoChannel') + identifier = JSON.generate(channel: "ClientTest::EchoChannel") - c = faye_client(port) - assert_equal({"type" => "welcome"}, c.read_message) - c.send_message command: 'subscribe', identifier: identifier - assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message) + c = websocket_client(port) + assert_equal({ "type" => "welcome" }, c.read_message) + c.send_message command: "subscribe", identifier: identifier + assert_equal({ "identifier" => "{\"channel\":\"ClientTest::EchoChannel\"}", "type" => "confirm_subscription" }, c.read_message) assert_equal(1, app.connections.count) assert(app.remote_connections.where(identifier: identifier)) subscriptions = app.connections.first.subscriptions.send(:subscriptions) - assert_not_equal 0, subscriptions.size, 'Missing EchoChannel subscription' + assert_not_equal 0, subscriptions.size, "Missing EchoChannel subscription" channel = subscriptions.first[1] channel.expects(:unsubscribed) c.close @@ -240,10 +300,10 @@ class ClientTest < ActionCable::TestCase def test_server_restart with_puma_server do |port| - c = faye_client(port) - assert_equal({"type" => "welcome"}, c.read_message) - c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel') - assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message) + c = websocket_client(port) + assert_equal({ "type" => "welcome" }, c.read_message) + c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel") + assert_equal({ "identifier" => "{\"channel\":\"ClientTest::EchoChannel\"}", "type" => "confirm_subscription" }, c.read_message) ActionCable.server.restart c.wait_for_close |