diff options
author | Matthew Draper <matthew@trebex.net> | 2016-01-30 04:21:16 +1030 |
---|---|---|
committer | Matthew Draper <matthew@trebex.net> | 2016-01-30 04:21:16 +1030 |
commit | 703ddadafee29c86ca5be499a80802fbea70a64f (patch) | |
tree | fd7558a9a27970c7d494a6768bac25361a808cd6 /actioncable/test | |
parent | c8818dfcdf9e92364745000eefe46132a43f8700 (diff) | |
parent | 4d01cd1545a00ed6f96d6cb658a590afd36e1871 (diff) | |
download | rails-703ddadafee29c86ca5be499a80802fbea70a64f.tar.gz rails-703ddadafee29c86ca5be499a80802fbea70a64f.tar.bz2 rails-703ddadafee29c86ca5be499a80802fbea70a64f.zip |
Merge pull request #23305 from matthewd/concurrent-take-2
EventMachine -> concurrent-ruby, take two
Diffstat (limited to 'actioncable/test')
-rw-r--r-- | actioncable/test/channel/periodic_timers_test.rb | 2 | ||||
-rw-r--r-- | actioncable/test/channel/stream_test.rb | 22 | ||||
-rw-r--r-- | actioncable/test/client/echo_channel.rb | 18 | ||||
-rw-r--r-- | actioncable/test/client_test.rb | 262 | ||||
-rw-r--r-- | actioncable/test/connection/base_test.rb | 19 | ||||
-rw-r--r-- | actioncable/test/connection/identifier_test.rb | 4 | ||||
-rw-r--r-- | actioncable/test/connection/multiple_identifiers_test.rb | 4 | ||||
-rw-r--r-- | actioncable/test/stubs/test_server.rb | 3 | ||||
-rw-r--r-- | actioncable/test/subscription_adapter/common.rb | 3 | ||||
-rw-r--r-- | actioncable/test/test_helper.rb | 26 |
10 files changed, 312 insertions, 51 deletions
diff --git a/actioncable/test/channel/periodic_timers_test.rb b/actioncable/test/channel/periodic_timers_test.rb index 1590a12f09..64f0247cd6 100644 --- a/actioncable/test/channel/periodic_timers_test.rb +++ b/actioncable/test/channel/periodic_timers_test.rb @@ -31,7 +31,7 @@ class ActionCable::Channel::PeriodicTimersTest < ActiveSupport::TestCase end test "timer start and stop" do - EventMachine::PeriodicTimer.expects(:new).times(2).returns(true) + Concurrent::TimerTask.expects(:new).times(2).returns(true) channel = ChatChannel.new @connection, "{id: 1}", { id: 1 } channel.expects(:stop_periodic_timers).once diff --git a/actioncable/test/channel/stream_test.rb b/actioncable/test/channel/stream_test.rb index 3fa2b291b7..947efd96d4 100644 --- a/actioncable/test/channel/stream_test.rb +++ b/actioncable/test/channel/stream_test.rb @@ -31,9 +31,7 @@ class ActionCable::Channel::StreamTest < ActionCable::TestCase test "stream_for" do run_in_eventmachine do connection = TestConnection.new - EM.next_tick do - connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } - end + connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } channel = ChatChannel.new connection, "" channel.stream_for Room.new(1) @@ -41,39 +39,35 @@ class ActionCable::Channel::StreamTest < ActionCable::TestCase end test "stream_from subscription confirmation" do - EM.run do + run_in_eventmachine do connection = TestConnection.new ChatChannel.new connection, "{id: 1}", { id: 1 } assert_nil connection.last_transmission - EM::Timer.new(0.1) do - expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "confirm_subscription" - connection.transmit(expected) + wait_for_async - assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation within 0.1s" + expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "confirm_subscription" + connection.transmit(expected) - EM.run_deferred_callbacks - EM.stop - end + assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation within 0.1s" end end test "subscription confirmation should only be sent out once" do - EM.run do + run_in_eventmachine do connection = TestConnection.new channel = ChatChannel.new connection, "test_channel" channel.send_confirmation channel.send_confirmation - EM.run_deferred_callbacks + wait_for_async expected = ActiveSupport::JSON.encode "identifier" => "test_channel", "type" => "confirm_subscription" assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation" assert_equal 1, connection.transmissions.size - EM.stop end end diff --git a/actioncable/test/client/echo_channel.rb b/actioncable/test/client/echo_channel.rb new file mode 100644 index 0000000000..63e35f194a --- /dev/null +++ b/actioncable/test/client/echo_channel.rb @@ -0,0 +1,18 @@ +class EchoChannel < ActionCable::Channel::Base + def subscribed + stream_from "global" + end + + def ding(data) + transmit(dong: data['message']) + end + + def delay(data) + sleep 1 + transmit(dong: data['message']) + end + + def bulk(data) + ActionCable.server.broadcast "global", wide: data['message'] + end +end diff --git a/actioncable/test/client_test.rb b/actioncable/test/client_test.rb new file mode 100644 index 0000000000..3d9690fb01 --- /dev/null +++ b/actioncable/test/client_test.rb @@ -0,0 +1,262 @@ +require 'test_helper' +require 'concurrent' + +require 'active_support/core_ext/hash/indifferent_access' +require 'pathname' + +require 'faye/websocket' +require 'json' + +class ClientTest < ActionCable::TestCase + WAIT_WHEN_EXPECTING_EVENT = 3 + WAIT_WHEN_NOT_EXPECTING_EVENT = 0.2 + + def setup + # TODO: ActionCable requires a *lot* of setup at the moment... + ::Object.const_set(:ApplicationCable, Module.new) + ::ApplicationCable.const_set(:Connection, Class.new(ActionCable::Connection::Base)) + + ::Object.const_set(:Rails, Module.new) + ::Rails.singleton_class.send(:define_method, :root) { Pathname.new(__dir__) } + + ActionCable.instance_variable_set(:@server, nil) + server = ActionCable.server + server.config = ActionCable::Server::Configuration.new + inner_logger = Logger.new(StringIO.new).tap { |l| l.level = Logger::UNKNOWN } + server.config.logger = ActionCable::Connection::TaggedLoggerProxy.new(inner_logger, tags: []) + + server.config.cable = { adapter: 'async' }.with_indifferent_access + + # and now the "real" setup for our test: + server.config.disable_request_forgery_protection = true + server.config.channel_load_paths = [File.expand_path('client', __dir__)] + + @reactor_mutex = Mutex.new + @reactor_users = 0 + @reactor_running = Concurrent::Event.new + + # faye-websocket is warning-rich + @previous_verbose, $VERBOSE = $VERBOSE, nil + end + + def teardown + if @reactor_running.set? + EM.stop + end + + $VERBOSE = @previous_verbose + + begin + ::Object.send(:remove_const, :ApplicationCable) + rescue NameError + end + begin + ::Object.send(:remove_const, :Rails) + rescue NameError + end + end + + def with_puma_server(rack_app = ActionCable.server, port = 3099) + server = ::Puma::Server.new(rack_app, ::Puma::Events.strings) + server.add_tcp_listener '127.0.0.1', port + server.min_threads = 1 + server.max_threads = 4 + + t = Thread.new { server.run.join } + yield port + + ensure + server.stop(true) if server + t.join if t + end + + def start_event_machine + @reactor_mutex.synchronize do + unless @reactor_running.set? + @reactor ||= Thread.new do + EM.next_tick do + @reactor_running.set + end + EM.run + end + @reactor_running.wait(WAIT_WHEN_EXPECTING_EVENT) + end + @reactor_users += 1 + end + end + + def stop_event_machine + @reactor_mutex.synchronize do + @reactor_users -= 1 + if @reactor_users < 1 + @reactor_running.reset + EM.stop + @reactor = nil + end + end + end + + class SyncClient + attr_reader :pings + + def initialize(em_controller, port) + em_controller.start_event_machine + + @ws = Faye::WebSocket::Client.new("ws://127.0.0.1:#{port}/") + @messages = Queue.new + @closed = Concurrent::Event.new + @has_messages = Concurrent::Event.new + @pings = 0 + + open = Concurrent::Event.new + error = nil + + @ws.on(:error) do |event| + if open.set? + @messages << RuntimeError.new(event.message) + else + error = event.message + open.set + end + end + + @ws.on(:open) do |event| + open.set + end + + @ws.on(:message) do |event| + hash = JSON.parse(event.data) + if hash['identifier'] == '_ping' + @pings += 1 + else + @messages << hash + @has_messages.set + end + end + + @ws.on(:close) do |event| + em_controller.stop_event_machine + @closed.set + end + + open.wait(WAIT_WHEN_EXPECTING_EVENT) + raise error if error + end + + def read_message + @has_messages.wait(WAIT_WHEN_EXPECTING_EVENT) if @messages.empty? + @has_messages.reset if @messages.size < 2 + + msg = @messages.pop(true) + raise msg if msg.is_a?(Exception) + + msg + end + + def read_messages(expected_size = 0) + list = [] + loop do + @has_messages.wait(list.size < expected_size ? WAIT_WHEN_EXPECTING_EVENT : WAIT_WHEN_NOT_EXPECTING_EVENT) + if @has_messages.set? + list << read_message + else + break + end + end + list + end + + def send_message(hash) + @ws.send(JSON.dump(hash)) + end + + def close + sleep WAIT_WHEN_NOT_EXPECTING_EVENT + + unless @messages.empty? + raise "#{@messages.size} messages unprocessed" + end + + @ws.close + @closed.wait(WAIT_WHEN_EXPECTING_EVENT) + end + + def close! + sock = BasicSocket.for_fd(@ws.instance_variable_get(:@stream).detach) + + # Force a TCP reset + sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, [1, 0].pack('ii')) + + sock.close + end + end + + def faye_client(port) + SyncClient.new(self, port) + end + + def test_single_client + with_puma_server do |port| + c = faye_client(port) + c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel') + assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message) + c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello') + assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "message"=>{"dong"=>"hello"}}, c.read_message) + c.close + end + end + + def test_interacting_clients + with_puma_server do |port| + clients = 10.times.map { faye_client(port) } + + barrier_1 = Concurrent::CyclicBarrier.new(clients.size) + barrier_2 = Concurrent::CyclicBarrier.new(clients.size) + + clients.map {|c| Concurrent::Future.execute { + c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel') + assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "type"=>"confirm_subscription"}, c.read_message) + c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello') + assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message) + barrier_1.wait WAIT_WHEN_EXPECTING_EVENT + c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'bulk', message: 'hello') + barrier_2.wait WAIT_WHEN_EXPECTING_EVENT + assert_equal clients.size, c.read_messages(clients.size).size + } }.each(&:wait!) + + clients.map {|c| Concurrent::Future.execute { c.close } }.each(&:wait!) + end + end + + def test_many_clients + with_puma_server do |port| + clients = 200.times.map { faye_client(port) } + + clients.map {|c| Concurrent::Future.execute { + c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel') + assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "type"=>"confirm_subscription"}, c.read_message) + c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello') + assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message) + } }.each(&:wait!) + + clients.map {|c| Concurrent::Future.execute { c.close } }.each(&:wait!) + end + end + + def test_disappearing_client + with_puma_server do |port| + c = faye_client(port) + c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel') + assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message) + c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'delay', message: 'hello') + c.close! # disappear before write + + c = faye_client(port) + c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel') + assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message) + c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello') + assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message) + c.close! # disappear before read + end + end +end diff --git a/actioncable/test/connection/base_test.rb b/actioncable/test/connection/base_test.rb index 182562db82..e2b017a9a1 100644 --- a/actioncable/test/connection/base_test.rb +++ b/actioncable/test/connection/base_test.rb @@ -37,6 +37,8 @@ class ActionCable::Connection::BaseTest < ActionCable::TestCase connection.process assert connection.websocket.possible? + + wait_for_async assert connection.websocket.alive? end end @@ -53,16 +55,15 @@ class ActionCable::Connection::BaseTest < ActionCable::TestCase test "on connection open" do run_in_eventmachine do connection = open_connection - connection.process connection.websocket.expects(:transmit).with(regexp_matches(/\_ping/)) connection.message_buffer.expects(:process!) - # Allow EM to run on_open callback - EM.next_tick do - assert_equal [ connection ], @server.connections - assert connection.connected - end + connection.process + wait_for_async + + assert_equal [ connection ], @server.connections + assert connection.connected end end @@ -72,12 +73,12 @@ class ActionCable::Connection::BaseTest < ActionCable::TestCase connection.process # Setup the connection - EventMachine.stubs(:add_periodic_timer).returns(true) - connection.send :on_open + Concurrent::TimerTask.stubs(:new).returns(true) + connection.send :handle_open assert connection.connected connection.subscriptions.expects(:unsubscribe_from_all) - connection.send :on_close + connection.send :handle_close assert ! connection.connected assert_equal [], @server.connections diff --git a/actioncable/test/connection/identifier_test.rb b/actioncable/test/connection/identifier_test.rb index a110dfdee0..1019ad541e 100644 --- a/actioncable/test/connection/identifier_test.rb +++ b/actioncable/test/connection/identifier_test.rb @@ -68,10 +68,10 @@ class ActionCable::Connection::IdentifierTest < ActionCable::TestCase @connection = Connection.new(server, env) @connection.process - @connection.send :on_open + @connection.send :handle_open end def close_connection - @connection.send :on_close + @connection.send :handle_close end end diff --git a/actioncable/test/connection/multiple_identifiers_test.rb b/actioncable/test/connection/multiple_identifiers_test.rb index 55a9f96cb3..e9bb4e6d7f 100644 --- a/actioncable/test/connection/multiple_identifiers_test.rb +++ b/actioncable/test/connection/multiple_identifiers_test.rb @@ -32,10 +32,10 @@ class ActionCable::Connection::MultipleIdentifiersTest < ActionCable::TestCase @connection = Connection.new(server, env) @connection.process - @connection.send :on_open + @connection.send :handle_open end def close_connection - @connection.send :on_close + @connection.send :handle_close end end diff --git a/actioncable/test/stubs/test_server.rb b/actioncable/test/stubs/test_server.rb index 6e6541a952..56d132b30a 100644 --- a/actioncable/test/stubs/test_server.rb +++ b/actioncable/test/stubs/test_server.rb @@ -14,6 +14,7 @@ class TestServer @config.subscription_adapter.new(self) end - def send_async + def stream_event_loop + @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new end end diff --git a/actioncable/test/subscription_adapter/common.rb b/actioncable/test/subscription_adapter/common.rb index d4a13be889..361858784e 100644 --- a/actioncable/test/subscription_adapter/common.rb +++ b/actioncable/test/subscription_adapter/common.rb @@ -1,7 +1,6 @@ require 'test_helper' require 'concurrent' -require 'action_cable/process/logging' require 'active_support/core_ext/hash/indifferent_access' require 'pathname' @@ -24,8 +23,6 @@ module CommonSubscriptionAdapterTest # and now the "real" setup for our test: - spawn_eventmachine - server.config.cable = cable_config.with_indifferent_access adapter_klass = server.config.pubsub_adapter diff --git a/actioncable/test/test_helper.rb b/actioncable/test/test_helper.rb index 6636ce078b..8ddbd4e764 100644 --- a/actioncable/test/test_helper.rb +++ b/actioncable/test/test_helper.rb @@ -13,28 +13,16 @@ require 'rack/mock' # Require all the stubs and models Dir[File.dirname(__FILE__) + '/stubs/*.rb'].each {|file| require file } -require 'faye/websocket' -class << Faye::WebSocket - remove_method :ensure_reactor_running - - # We don't want Faye to start the EM reactor in tests because it makes testing much harder. - # We want to be able to start and stop EM loop in tests to make things simpler. - def ensure_reactor_running - # no-op - end -end - class ActionCable::TestCase < ActiveSupport::TestCase - def run_in_eventmachine - EM.run do - yield - - EM.run_deferred_callbacks - EM.stop + def wait_for_async + e = Concurrent.global_io_executor + until e.completed_task_count == e.scheduled_task_count + sleep 0.1 end end - def spawn_eventmachine - Thread.new { EventMachine.run } unless EventMachine.reactor_running? + def run_in_eventmachine + yield + wait_for_async end end |