diff options
Diffstat (limited to 'actioncable/test/client_test.rb')
-rw-r--r-- | actioncable/test/client_test.rb | 291 |
1 files changed, 181 insertions, 110 deletions
diff --git a/actioncable/test/client_test.rb b/actioncable/test/client_test.rb index 4ade9832e0..db10a7ad16 100644 --- a/actioncable/test/client_test.rb +++ b/actioncable/test/client_test.rb @@ -1,59 +1,70 @@ -require 'test_helper' -require 'concurrent' +require "test_helper" +require "concurrent" -require 'active_support/core_ext/hash/indifferent_access' -require 'pathname' +require "websocket-client-simple" +require "json" -require 'faye/websocket' -require 'json' +require "active_support/hash_with_indifferent_access" -class ClientTest < ActionCable::TestCase - WAIT_WHEN_EXPECTING_EVENT = 3 - WAIT_WHEN_NOT_EXPECTING_EVENT = 0.2 +#### +# 😷 Warning suppression 😷 +WebSocket::Frame::Handler::Handler03.prepend Module.new { + def initialize(*) + @application_data_buffer = nil + super + end +} - def setup - # TODO: ActionCable requires a *lot* of setup at the moment... - ::Object.const_set(:ApplicationCable, Module.new) - ::ApplicationCable.const_set(:Connection, Class.new(ActionCable::Connection::Base)) +WebSocket::Frame::Data.prepend Module.new { + def initialize(*) + @masking_key = nil + super + end +} +# +#### - ::Object.const_set(:Rails, Module.new) - ::Rails.singleton_class.send(:define_method, :root) { Pathname.new(__dir__) } +class ClientTest < ActionCable::TestCase + WAIT_WHEN_EXPECTING_EVENT = 2 + WAIT_WHEN_NOT_EXPECTING_EVENT = 0.5 - ActionCable.instance_variable_set(:@server, nil) - server = ActionCable.server - server.config = ActionCable::Server::Configuration.new - inner_logger = Logger.new(StringIO.new).tap { |l| l.level = Logger::UNKNOWN } - server.config.logger = ActionCable::Connection::TaggedLoggerProxy.new(inner_logger, tags: []) + class EchoChannel < ActionCable::Channel::Base + def subscribed + stream_from "global" + end - server.config.cable = { adapter: 'async' }.with_indifferent_access + def unsubscribed + "Goodbye from EchoChannel!" + end - # and now the "real" setup for our test: - server.config.disable_request_forgery_protection = true - server.config.channel_load_paths = [File.expand_path('client', __dir__)] + def ding(data) + transmit(dong: data["message"]) + end - Thread.new { EventMachine.run } unless EventMachine.reactor_running? - Thread.pass until EventMachine.reactor_running? + def delay(data) + sleep 1 + transmit(dong: data["message"]) + end - # faye-websocket is warning-rich - @previous_verbose, $VERBOSE = $VERBOSE, nil + def bulk(data) + ActionCable.server.broadcast "global", wide: data["message"] + end end - def teardown - $VERBOSE = @previous_verbose + def setup + ActionCable.instance_variable_set(:@server, nil) + server = ActionCable.server + server.config.logger = Logger.new(StringIO.new).tap { |l| l.level = Logger::UNKNOWN } - begin - ::Object.send(:remove_const, :ApplicationCable) - rescue NameError - end - begin - ::Object.send(:remove_const, :Rails) - rescue NameError - end + server.config.cable = ActiveSupport::HashWithIndifferentAccess.new(adapter: "async") + + # and now the "real" setup for our test: + server.config.disable_request_forgery_protection = true end def with_puma_server(rack_app = ActionCable.server, port = 3099) server = ::Puma::Server.new(rack_app, ::Puma::Events.strings) - server.add_tcp_listener '127.0.0.1', port + server.add_tcp_listener "127.0.0.1", port server.min_threads = 1 server.max_threads = 4 @@ -69,49 +80,53 @@ class ClientTest < ActionCable::TestCase attr_reader :pings def initialize(port) - @ws = Faye::WebSocket::Client.new("ws://127.0.0.1:#{port}/") - @messages = Queue.new - @closed = Concurrent::Event.new - @has_messages = Concurrent::Event.new - @pings = 0 - - open = Concurrent::Event.new - error = nil - - @ws.on(:error) do |event| - if open.set? - @messages << RuntimeError.new(event.message) - else - error = event.message - open.set + messages = @messages = Queue.new + closed = @closed = Concurrent::Event.new + has_messages = @has_messages = Concurrent::Semaphore.new(0) + pings = @pings = Concurrent::AtomicFixnum.new(0) + + open = Concurrent::Promise.new + + @ws = WebSocket::Client::Simple.connect("ws://127.0.0.1:#{port}/") do |ws| + ws.on(:error) do |event| + event = RuntimeError.new(event.message) unless event.is_a?(Exception) + + if open.pending? + open.fail(event) + else + messages << event + has_messages.release + end end - end - @ws.on(:open) do |event| - open.set - end + ws.on(:open) do |event| + open.set(true) + end - @ws.on(:message) do |event| - hash = JSON.parse(event.data) - if hash['identifier'] == '_ping' - @pings += 1 - else - @messages << hash - @has_messages.set + ws.on(:message) do |event| + if event.type == :close + closed.set + else + message = JSON.parse(event.data) + if message["type"] == "ping" + pings.increment + else + messages << message + has_messages.release + end + end end - end - @ws.on(:close) do |event| - @closed.set + ws.on(:close) do |event| + closed.set + end end - open.wait(WAIT_WHEN_EXPECTING_EVENT) - raise error if error + open.wait!(WAIT_WHEN_EXPECTING_EVENT) end def read_message - @has_messages.wait(WAIT_WHEN_EXPECTING_EVENT) if @messages.empty? - @has_messages.reset if @messages.size < 2 + @has_messages.try_acquire(1, WAIT_WHEN_EXPECTING_EVENT) msg = @messages.pop(true) raise msg if msg.is_a?(Exception) @@ -122,9 +137,11 @@ class ClientTest < ActionCable::TestCase def read_messages(expected_size = 0) list = [] loop do - @has_messages.wait(list.size < expected_size ? WAIT_WHEN_EXPECTING_EVENT : WAIT_WHEN_NOT_EXPECTING_EVENT) - if @has_messages.set? - list << read_message + if @has_messages.try_acquire(1, list.size < expected_size ? WAIT_WHEN_EXPECTING_EVENT : WAIT_WHEN_NOT_EXPECTING_EVENT) + msg = @messages.pop(true) + raise msg if msg.is_a?(Exception) + + list << msg else break end @@ -132,8 +149,8 @@ class ClientTest < ActionCable::TestCase list end - def send_message(hash) - @ws.send(JSON.dump(hash)) + def send_message(message) + @ws.send(JSON.generate(message)) end def close @@ -144,76 +161,130 @@ class ClientTest < ActionCable::TestCase end @ws.close + wait_for_close + end + + def wait_for_close @closed.wait(WAIT_WHEN_EXPECTING_EVENT) end + + def closed? + @closed.set? + end end - def faye_client(port) + def websocket_client(port) SyncClient.new(port) end + def concurrently(enum) + enum.map { |*x| Concurrent::Future.execute { yield(*x) } }.map(&:value!) + end + def test_single_client with_puma_server do |port| - c = faye_client(port) - c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel') - assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message) - c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello') - assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "message"=>{"dong"=>"hello"}}, c.read_message) + c = websocket_client(port) + assert_equal({ "type" => "welcome" }, c.read_message) # pop the first welcome message off the stack + c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel") + assert_equal({ "identifier"=>"{\"channel\":\"ClientTest::EchoChannel\"}", "type"=>"confirm_subscription" }, c.read_message) + c.send_message command: "message", identifier: JSON.generate(channel: "ClientTest::EchoChannel"), data: JSON.generate(action: "ding", message: "hello") + assert_equal({ "identifier"=>"{\"channel\":\"ClientTest::EchoChannel\"}", "message"=>{ "dong"=>"hello" } }, c.read_message) c.close end end def test_interacting_clients with_puma_server do |port| - clients = 10.times.map { faye_client(port) } + clients = concurrently(10.times) { websocket_client(port) } barrier_1 = Concurrent::CyclicBarrier.new(clients.size) barrier_2 = Concurrent::CyclicBarrier.new(clients.size) - clients.map {|c| Concurrent::Future.execute { - c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel') - assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "type"=>"confirm_subscription"}, c.read_message) - c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello') - assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message) + concurrently(clients) do |c| + assert_equal({ "type" => "welcome" }, c.read_message) # pop the first welcome message off the stack + c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel") + assert_equal({ "identifier"=>'{"channel":"ClientTest::EchoChannel"}', "type"=>"confirm_subscription" }, c.read_message) + c.send_message command: "message", identifier: JSON.generate(channel: "ClientTest::EchoChannel"), data: JSON.generate(action: "ding", message: "hello") + assert_equal({ "identifier"=>'{"channel":"ClientTest::EchoChannel"}', "message"=>{ "dong"=>"hello" } }, c.read_message) barrier_1.wait WAIT_WHEN_EXPECTING_EVENT - c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'bulk', message: 'hello') + c.send_message command: "message", identifier: JSON.generate(channel: "ClientTest::EchoChannel"), data: JSON.generate(action: "bulk", message: "hello") barrier_2.wait WAIT_WHEN_EXPECTING_EVENT assert_equal clients.size, c.read_messages(clients.size).size - } }.each(&:wait!) + end - clients.map {|c| Concurrent::Future.execute { c.close } }.each(&:wait!) + concurrently(clients, &:close) end end def test_many_clients with_puma_server do |port| - clients = 100.times.map { faye_client(port) } - - clients.map {|c| Concurrent::Future.execute { - c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel') - assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "type"=>"confirm_subscription"}, c.read_message) - c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello') - assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message) - } }.each(&:wait!) + clients = concurrently(100.times) { websocket_client(port) } + + concurrently(clients) do |c| + assert_equal({ "type" => "welcome" }, c.read_message) # pop the first welcome message off the stack + c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel") + assert_equal({ "identifier"=>'{"channel":"ClientTest::EchoChannel"}', "type"=>"confirm_subscription" }, c.read_message) + c.send_message command: "message", identifier: JSON.generate(channel: "ClientTest::EchoChannel"), data: JSON.generate(action: "ding", message: "hello") + assert_equal({ "identifier"=>'{"channel":"ClientTest::EchoChannel"}', "message"=>{ "dong"=>"hello" } }, c.read_message) + end - clients.map {|c| Concurrent::Future.execute { c.close } }.each(&:wait!) + concurrently(clients, &:close) end end def test_disappearing_client with_puma_server do |port| - c = faye_client(port) - c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel') - assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message) - c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'delay', message: 'hello') + c = websocket_client(port) + assert_equal({ "type" => "welcome" }, c.read_message) # pop the first welcome message off the stack + c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel") + assert_equal({ "identifier"=>"{\"channel\":\"ClientTest::EchoChannel\"}", "type"=>"confirm_subscription" }, c.read_message) + c.send_message command: "message", identifier: JSON.generate(channel: "ClientTest::EchoChannel"), data: JSON.generate(action: "delay", message: "hello") c.close # disappear before write - c = faye_client(port) - c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel') - assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message) - c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello') - assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message) + c = websocket_client(port) + assert_equal({ "type" => "welcome" }, c.read_message) # pop the first welcome message off the stack + c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel") + assert_equal({ "identifier"=>"{\"channel\":\"ClientTest::EchoChannel\"}", "type"=>"confirm_subscription" }, c.read_message) + c.send_message command: "message", identifier: JSON.generate(channel: "ClientTest::EchoChannel"), data: JSON.generate(action: "ding", message: "hello") + assert_equal({ "identifier"=>'{"channel":"ClientTest::EchoChannel"}', "message"=>{ "dong"=>"hello" } }, c.read_message) c.close # disappear before read end end + + def test_unsubscribe_client + with_puma_server do |port| + app = ActionCable.server + identifier = JSON.generate(channel: "ClientTest::EchoChannel") + + c = websocket_client(port) + assert_equal({ "type" => "welcome" }, c.read_message) + c.send_message command: "subscribe", identifier: identifier + assert_equal({ "identifier"=>"{\"channel\":\"ClientTest::EchoChannel\"}", "type"=>"confirm_subscription" }, c.read_message) + assert_equal(1, app.connections.count) + assert(app.remote_connections.where(identifier: identifier)) + + subscriptions = app.connections.first.subscriptions.send(:subscriptions) + assert_not_equal 0, subscriptions.size, "Missing EchoChannel subscription" + channel = subscriptions.first[1] + channel.expects(:unsubscribed) + c.close + sleep 0.1 # Data takes a moment to process + + # All data is removed: No more connection or subscription information! + assert_equal(0, app.connections.count) + end + end + + def test_server_restart + with_puma_server do |port| + c = websocket_client(port) + assert_equal({ "type" => "welcome" }, c.read_message) + c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel") + assert_equal({ "identifier"=>"{\"channel\":\"ClientTest::EchoChannel\"}", "type"=>"confirm_subscription" }, c.read_message) + + ActionCable.server.restart + c.wait_for_close + assert c.closed? + end + end end |