diff options
Diffstat (limited to 'actioncable')
25 files changed, 382 insertions, 99 deletions
diff --git a/actioncable/actioncable.gemspec b/actioncable/actioncable.gemspec index a36acc8f6f..14f968f1ef 100644 --- a/actioncable/actioncable.gemspec +++ b/actioncable/actioncable.gemspec @@ -21,8 +21,7 @@ Gem::Specification.new do |s| s.add_dependency 'actionpack', version s.add_dependency 'coffee-rails', '~> 4.1.0' - s.add_dependency 'eventmachine', '~> 1.0' - s.add_dependency 'faye-websocket', '~> 0.10.0' + s.add_dependency 'nio4r', '~> 1.2' s.add_dependency 'websocket-driver', '~> 0.6.1' s.add_development_dependency 'em-hiredis', '~> 0.3.0' diff --git a/actioncable/lib/action_cable/channel/periodic_timers.rb b/actioncable/lib/action_cable/channel/periodic_timers.rb index 7f0fb37afc..56597d02d7 100644 --- a/actioncable/lib/action_cable/channel/periodic_timers.rb +++ b/actioncable/lib/action_cable/channel/periodic_timers.rb @@ -27,14 +27,14 @@ module ActionCable def start_periodic_timers self.class.periodic_timers.each do |callback, options| - active_periodic_timers << EventMachine::PeriodicTimer.new(options[:every]) do + active_periodic_timers << Concurrent::TimerTask.new(execution_interval: options[:every]) do connection.worker_pool.async_run_periodic_timer(self, callback) end end end def stop_periodic_timers - active_periodic_timers.each { |timer| timer.cancel } + active_periodic_timers.each { |timer| timer.shutdown } end end end diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index e2876ef6fa..a26373e387 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -75,7 +75,7 @@ module ActionCable callback ||= default_stream_callback(broadcasting) streams << [ broadcasting, callback ] - EM.next_tick do + Concurrent.global_io_executor.post do pubsub.subscribe(broadcasting, callback, lambda do transmit_subscription_confirmation logger.info "#{self.class.name} is streaming from #{broadcasting}" diff --git a/actioncable/lib/action_cable/connection.rb b/actioncable/lib/action_cable/connection.rb index b672e00682..902efb07e2 100644 --- a/actioncable/lib/action_cable/connection.rb +++ b/actioncable/lib/action_cable/connection.rb @@ -5,12 +5,15 @@ module ActionCable eager_autoload do autoload :Authorization autoload :Base + autoload :ClientSocket autoload :Identification autoload :InternalChannel autoload :MessageBuffer - autoload :WebSocket + autoload :Stream + autoload :StreamEventLoop autoload :Subscriptions autoload :TaggedLoggerProxy + autoload :WebSocket end end end diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb index bb8850aaa0..0016d1a1a4 100644 --- a/actioncable/lib/action_cable/connection/base.rb +++ b/actioncable/lib/action_cable/connection/base.rb @@ -49,14 +49,14 @@ module ActionCable include Authorization attr_reader :server, :env, :subscriptions, :logger - delegate :worker_pool, :pubsub, to: :server + delegate :stream_event_loop, :worker_pool, :pubsub, to: :server def initialize(server, env) @server, @env = server, env @logger = new_tagged_logger - @websocket = ActionCable::Connection::WebSocket.new(env) + @websocket = ActionCable::Connection::WebSocket.new(env, self, stream_event_loop) @subscriptions = ActionCable::Connection::Subscriptions.new(self) @message_buffer = ActionCable::Connection::MessageBuffer.new(self) @@ -70,10 +70,6 @@ module ActionCable logger.info started_request_message if websocket.possible? && allow_request_origin? - websocket.on(:open) { |event| send_async :on_open } - websocket.on(:message) { |event| on_message event.data } - websocket.on(:close) { |event| send_async :on_close } - respond_to_successful_request else respond_to_invalid_request @@ -121,6 +117,22 @@ module ActionCable transmit ActiveSupport::JSON.encode(identifier: ActionCable::INTERNAL[:identifiers][:ping], message: Time.now.to_i) end + def on_open # :nodoc: + send_async :handle_open + end + + def on_message(message) # :nodoc: + message_buffer.append message + end + + def on_error(message) # :nodoc: + # ignore + end + + def on_close # :nodoc: + send_async :handle_close + end + protected # The request that initiated the WebSocket connection is available here. This gives access to the environment, cookies, etc. def request @@ -139,7 +151,7 @@ module ActionCable attr_reader :message_buffer private - def on_open + def handle_open connect if respond_to?(:connect) subscribe_to_internal_channel beat @@ -150,11 +162,7 @@ module ActionCable respond_to_invalid_request end - def on_message(message) - message_buffer.append message - end - - def on_close + def handle_close logger.info finished_request_message server.remove_connection(self) diff --git a/actioncable/lib/action_cable/connection/client_socket.rb b/actioncable/lib/action_cable/connection/client_socket.rb new file mode 100644 index 0000000000..62dd753646 --- /dev/null +++ b/actioncable/lib/action_cable/connection/client_socket.rb @@ -0,0 +1,152 @@ +require 'websocket/driver' + +module ActionCable + module Connection + #-- + # This class is heavily based on faye-websocket-ruby + # + # Copyright (c) 2010-2015 James Coglan + class ClientSocket # :nodoc: + def self.determine_url(env) + scheme = secure_request?(env) ? 'wss:' : 'ws:' + "#{ scheme }//#{ env['HTTP_HOST'] }#{ env['REQUEST_URI'] }" + end + + def self.secure_request?(env) + return true if env['HTTPS'] == 'on' + return true if env['HTTP_X_FORWARDED_SSL'] == 'on' + return true if env['HTTP_X_FORWARDED_SCHEME'] == 'https' + return true if env['HTTP_X_FORWARDED_PROTO'] == 'https' + return true if env['rack.url_scheme'] == 'https' + + return false + end + + CONNECTING = 0 + OPEN = 1 + CLOSING = 2 + CLOSED = 3 + + attr_reader :env, :url + + def initialize(env, event_target, stream_event_loop) + @env = env + @event_target = event_target + @stream_event_loop = stream_event_loop + + @url = ClientSocket.determine_url(@env) + + @driver = @driver_started = nil + + @ready_state = CONNECTING + + # The driver calls +env+, +url+, and +write+ + @driver = ::WebSocket::Driver.rack(self) + + @driver.on(:open) { |e| open } + @driver.on(:message) { |e| receive_message(e.data) } + @driver.on(:close) { |e| begin_close(e.reason, e.code) } + @driver.on(:error) { |e| emit_error(e.message) } + + @stream = ActionCable::Connection::Stream.new(@stream_event_loop, self) + + if callback = @env['async.callback'] + callback.call([101, {}, @stream]) + end + end + + def start_driver + return if @driver.nil? || @driver_started + @driver_started = true + @driver.start + end + + def rack_response + start_driver + [ -1, {}, [] ] + end + + def write(data) + @stream.write(data) + end + + def transmit(message) + return false if @ready_state > OPEN + case message + when Numeric then @driver.text(message.to_s) + when String then @driver.text(message) + when Array then @driver.binary(message) + else false + end + end + + def close(code = nil, reason = nil) + code ||= 1000 + reason ||= '' + + unless code == 1000 or (code >= 3000 and code <= 4999) + raise ArgumentError, "Failed to execute 'close' on WebSocket: " + + "The code must be either 1000, or between 3000 and 4999. " + + "#{code} is neither." + end + + @ready_state = CLOSING unless @ready_state == CLOSED + @driver.close(reason, code) + end + + def parse(data) + @driver.parse(data) + end + + def client_gone + finalize_close + end + + def alive? + @ready_state == OPEN + end + + private + def open + return unless @ready_state == CONNECTING + @ready_state = OPEN + + @event_target.on_open + end + + def receive_message(data) + return unless @ready_state == OPEN + + @event_target.on_message(data) + end + + def emit_error(message) + return if @ready_state >= CLOSING + + @event_target.on_error(message) + end + + def begin_close(reason, code) + return if @ready_state == CLOSED + @ready_state = CLOSING + @close_params = [reason, code] + + if @stream + @stream.shutdown + else + finalize_close + end + end + + def finalize_close + return if @ready_state == CLOSED + @ready_state = CLOSED + + reason = @close_params ? @close_params[0] : '' + code = @close_params ? @close_params[1] : 1006 + + @event_target.on_close(code, reason) + end + end + end +end diff --git a/actioncable/lib/action_cable/connection/internal_channel.rb b/actioncable/lib/action_cable/connection/internal_channel.rb index 54ed7672d2..27826792b3 100644 --- a/actioncable/lib/action_cable/connection/internal_channel.rb +++ b/actioncable/lib/action_cable/connection/internal_channel.rb @@ -15,14 +15,14 @@ module ActionCable @_internal_subscriptions ||= [] @_internal_subscriptions << [ internal_channel, callback ] - EM.next_tick { pubsub.subscribe(internal_channel, callback) } + Concurrent.global_io_executor.post { pubsub.subscribe(internal_channel, callback) } logger.info "Registered connection (#{connection_identifier})" end end def unsubscribe_from_internal_channel if @_internal_subscriptions.present? - @_internal_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe(channel, callback) } } + @_internal_subscriptions.each { |channel, callback| Concurrent.global_io_executor.post { pubsub.unsubscribe(channel, callback) } } end end diff --git a/actioncable/lib/action_cable/connection/stream.rb b/actioncable/lib/action_cable/connection/stream.rb new file mode 100644 index 0000000000..ace250cd16 --- /dev/null +++ b/actioncable/lib/action_cable/connection/stream.rb @@ -0,0 +1,59 @@ +module ActionCable + module Connection + #-- + # This class is heavily based on faye-websocket-ruby + # + # Copyright (c) 2010-2015 James Coglan + class Stream + def initialize(event_loop, socket) + @event_loop = event_loop + @socket_object = socket + @stream_send = socket.env['stream.send'] + + @rack_hijack_io = nil + + hijack_rack_socket + end + + def each(&callback) + @stream_send ||= callback + end + + def close + shutdown + @socket_object.client_gone + end + + def shutdown + clean_rack_hijack + end + + def write(data) + return @rack_hijack_io.write(data) if @rack_hijack_io + return @stream_send.call(data) if @stream_send + rescue EOFError + @socket_object.client_gone + end + + def receive(data) + @socket_object.parse(data) + end + + private + def hijack_rack_socket + return unless @socket_object.env['rack.hijack'] + + @socket_object.env['rack.hijack'].call + @rack_hijack_io = @socket_object.env['rack.hijack_io'] + + @event_loop.attach(@rack_hijack_io, self) + end + + def clean_rack_hijack + return unless @rack_hijack_io + @event_loop.detach(@rack_hijack_io, self) + @rack_hijack_io = nil + end + end + end +end diff --git a/actioncable/lib/action_cable/connection/stream_event_loop.rb b/actioncable/lib/action_cable/connection/stream_event_loop.rb new file mode 100644 index 0000000000..f773814973 --- /dev/null +++ b/actioncable/lib/action_cable/connection/stream_event_loop.rb @@ -0,0 +1,68 @@ +require 'nio' + +module ActionCable + module Connection + class StreamEventLoop + def initialize + @nio = NIO::Selector.new + @map = {} + @stopping = false + @todo = Queue.new + + Thread.new do + Thread.current.abort_on_exception = true + run + end + end + + def attach(io, stream) + @todo << lambda do + @map[io] = stream + @nio.register(io, :r) + end + @nio.wakeup + end + + def detach(io, stream) + @todo << lambda do + @nio.deregister(io) + @map.delete io + end + @nio.wakeup + end + + def stop + @stopping = true + @nio.wakeup + end + + def run + loop do + if @stopping + @nio.close + break + end + + until @todo.empty? + @todo.pop(true).call + end + + if monitors = @nio.select + monitors.each do |monitor| + io = monitor.io + stream = @map[io] + + begin + stream.receive io.read_nonblock(4096) + rescue IO::WaitReadable + next + rescue EOFError + stream.close + end + end + end + end + end + end + end +end diff --git a/actioncable/lib/action_cable/connection/web_socket.rb b/actioncable/lib/action_cable/connection/web_socket.rb index 670d5690ae..5e89fb9b72 100644 --- a/actioncable/lib/action_cable/connection/web_socket.rb +++ b/actioncable/lib/action_cable/connection/web_socket.rb @@ -1,13 +1,11 @@ -require 'faye/websocket' +require 'websocket/driver' module ActionCable module Connection - # Decorate the Faye::WebSocket with helpers we need. + # Wrap the real socket to minimize the externally-presented API class WebSocket - delegate :rack_response, :close, :on, to: :websocket - - def initialize(env) - @websocket = Faye::WebSocket.websocket?(env) ? Faye::WebSocket.new(env) : nil + def initialize(env, event_target, stream_event_loop) + @websocket = ::WebSocket::Driver.websocket?(env) ? ClientSocket.new(env, event_target, stream_event_loop) : nil end def possible? @@ -15,11 +13,19 @@ module ActionCable end def alive? - websocket && websocket.ready_state == Faye::WebSocket::API::OPEN + websocket && websocket.alive? end def transmit(data) - websocket.send data + websocket.transmit data + end + + def close + websocket.close + end + + def rack_response + websocket.rack_response end protected diff --git a/actioncable/lib/action_cable/process/logging.rb b/actioncable/lib/action_cable/process/logging.rb deleted file mode 100644 index dce637b3ca..0000000000 --- a/actioncable/lib/action_cable/process/logging.rb +++ /dev/null @@ -1,7 +0,0 @@ -require 'action_cable/server' -require 'eventmachine' - -EM.error_handler do |e| - puts "Error raised inside the event loop: #{e.message}" - puts e.backtrace.join("\n") -end diff --git a/actioncable/lib/action_cable/server.rb b/actioncable/lib/action_cable/server.rb index a2a89d5f1e..bd6a3826a3 100644 --- a/actioncable/lib/action_cable/server.rb +++ b/actioncable/lib/action_cable/server.rb @@ -1,7 +1,3 @@ -require 'eventmachine' -EventMachine.epoll if EventMachine.epoll? -EventMachine.kqueue if EventMachine.kqueue? - module ActionCable module Server extend ActiveSupport::Autoload diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb index 3385a4c9f3..b00abd208c 100644 --- a/actioncable/lib/action_cable/server/base.rb +++ b/actioncable/lib/action_cable/server/base.rb @@ -32,6 +32,10 @@ module ActionCable @remote_connections ||= RemoteConnections.new(self) end + def stream_event_loop + @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new + end + # The thread worker pool for handling all the connection work on this server. Default size is set by config.worker_pool_size. def worker_pool @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size) diff --git a/actioncable/lib/action_cable/server/connections.rb b/actioncable/lib/action_cable/server/connections.rb index 47dcea8c20..8671dd5ebd 100644 --- a/actioncable/lib/action_cable/server/connections.rb +++ b/actioncable/lib/action_cable/server/connections.rb @@ -22,11 +22,9 @@ module ActionCable # then can't rely on being able to receive and send to it. So there's a 3 second heartbeat running on all connections. If the beat fails, we automatically # disconnect. def setup_heartbeat_timer - EM.next_tick do - @heartbeat_timer ||= EventMachine.add_periodic_timer(BEAT_INTERVAL) do - EM.next_tick { connections.map(&:beat) } - end - end + @heartbeat_timer ||= Concurrent::TimerTask.new(execution_interval: BEAT_INTERVAL) do + Concurrent.global_io_executor.post { connections.map(&:beat) } + end.tap(&:execute) end def open_connections_statistics diff --git a/actioncable/lib/action_cable/subscription_adapter/async.rb b/actioncable/lib/action_cable/subscription_adapter/async.rb index 85d4892e4c..c88b03947a 100644 --- a/actioncable/lib/action_cable/subscription_adapter/async.rb +++ b/actioncable/lib/action_cable/subscription_adapter/async.rb @@ -10,11 +10,11 @@ module ActionCable class AsyncSubscriberMap < SubscriberMap def add_subscriber(*) - ::EM.next_tick { super } + Concurrent.global_io_executor.post { super } end def invoke_callback(*) - ::EM.next_tick { super } + Concurrent.global_io_executor.post { super } end end end diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb index 78f8aeb599..3ce1bbed68 100644 --- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb +++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb @@ -63,7 +63,7 @@ module ActionCable case action when :listen pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}") - ::EM.next_tick(&callback) if callback + Concurrent.global_io_executor << callback if callback when :unlisten pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}") when :shutdown @@ -93,7 +93,7 @@ module ActionCable end def invoke_callback(*) - ::EM.next_tick { super } + Concurrent.global_io_executor.post { super } end end end diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb index 3b86354621..a035e3988d 100644 --- a/actioncable/lib/action_cable/subscription_adapter/redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb @@ -1,11 +1,18 @@ +require 'thread' + gem 'em-hiredis', '~> 0.3.0' gem 'redis', '~> 3.0' require 'em-hiredis' require 'redis' +EventMachine.epoll if EventMachine.epoll? +EventMachine.kqueue if EventMachine.kqueue? + module ActionCable module SubscriptionAdapter class Redis < Base # :nodoc: + @@mutex = Mutex.new + def broadcast(channel, payload) redis_connection_for_broadcasts.publish(channel, payload) end @@ -27,6 +34,7 @@ module ActionCable private def redis_connection_for_subscriptions + ensure_reactor_running @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis| redis.on(:reconnect_failed) do @logger.info "[ActionCable] Redis reconnect failed." @@ -37,6 +45,14 @@ module ActionCable def redis_connection_for_broadcasts @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable) end + + def ensure_reactor_running + return if EventMachine.reactor_running? + @@mutex.synchronize do + Thread.new { EventMachine.run } unless EventMachine.reactor_running? + Thread.pass until EventMachine.reactor_running? + end + end end end end diff --git a/actioncable/test/channel/periodic_timers_test.rb b/actioncable/test/channel/periodic_timers_test.rb index 1590a12f09..64f0247cd6 100644 --- a/actioncable/test/channel/periodic_timers_test.rb +++ b/actioncable/test/channel/periodic_timers_test.rb @@ -31,7 +31,7 @@ class ActionCable::Channel::PeriodicTimersTest < ActiveSupport::TestCase end test "timer start and stop" do - EventMachine::PeriodicTimer.expects(:new).times(2).returns(true) + Concurrent::TimerTask.expects(:new).times(2).returns(true) channel = ChatChannel.new @connection, "{id: 1}", { id: 1 } channel.expects(:stop_periodic_timers).once diff --git a/actioncable/test/channel/stream_test.rb b/actioncable/test/channel/stream_test.rb index 3fa2b291b7..947efd96d4 100644 --- a/actioncable/test/channel/stream_test.rb +++ b/actioncable/test/channel/stream_test.rb @@ -31,9 +31,7 @@ class ActionCable::Channel::StreamTest < ActionCable::TestCase test "stream_for" do run_in_eventmachine do connection = TestConnection.new - EM.next_tick do - connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } - end + connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } channel = ChatChannel.new connection, "" channel.stream_for Room.new(1) @@ -41,39 +39,35 @@ class ActionCable::Channel::StreamTest < ActionCable::TestCase end test "stream_from subscription confirmation" do - EM.run do + run_in_eventmachine do connection = TestConnection.new ChatChannel.new connection, "{id: 1}", { id: 1 } assert_nil connection.last_transmission - EM::Timer.new(0.1) do - expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "confirm_subscription" - connection.transmit(expected) + wait_for_async - assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation within 0.1s" + expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "confirm_subscription" + connection.transmit(expected) - EM.run_deferred_callbacks - EM.stop - end + assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation within 0.1s" end end test "subscription confirmation should only be sent out once" do - EM.run do + run_in_eventmachine do connection = TestConnection.new channel = ChatChannel.new connection, "test_channel" channel.send_confirmation channel.send_confirmation - EM.run_deferred_callbacks + wait_for_async expected = ActiveSupport::JSON.encode "identifier" => "test_channel", "type" => "confirm_subscription" assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation" assert_equal 1, connection.transmissions.size - EM.stop end end diff --git a/actioncable/test/connection/base_test.rb b/actioncable/test/connection/base_test.rb index 182562db82..e2b017a9a1 100644 --- a/actioncable/test/connection/base_test.rb +++ b/actioncable/test/connection/base_test.rb @@ -37,6 +37,8 @@ class ActionCable::Connection::BaseTest < ActionCable::TestCase connection.process assert connection.websocket.possible? + + wait_for_async assert connection.websocket.alive? end end @@ -53,16 +55,15 @@ class ActionCable::Connection::BaseTest < ActionCable::TestCase test "on connection open" do run_in_eventmachine do connection = open_connection - connection.process connection.websocket.expects(:transmit).with(regexp_matches(/\_ping/)) connection.message_buffer.expects(:process!) - # Allow EM to run on_open callback - EM.next_tick do - assert_equal [ connection ], @server.connections - assert connection.connected - end + connection.process + wait_for_async + + assert_equal [ connection ], @server.connections + assert connection.connected end end @@ -72,12 +73,12 @@ class ActionCable::Connection::BaseTest < ActionCable::TestCase connection.process # Setup the connection - EventMachine.stubs(:add_periodic_timer).returns(true) - connection.send :on_open + Concurrent::TimerTask.stubs(:new).returns(true) + connection.send :handle_open assert connection.connected connection.subscriptions.expects(:unsubscribe_from_all) - connection.send :on_close + connection.send :handle_close assert ! connection.connected assert_equal [], @server.connections diff --git a/actioncable/test/connection/identifier_test.rb b/actioncable/test/connection/identifier_test.rb index a110dfdee0..1019ad541e 100644 --- a/actioncable/test/connection/identifier_test.rb +++ b/actioncable/test/connection/identifier_test.rb @@ -68,10 +68,10 @@ class ActionCable::Connection::IdentifierTest < ActionCable::TestCase @connection = Connection.new(server, env) @connection.process - @connection.send :on_open + @connection.send :handle_open end def close_connection - @connection.send :on_close + @connection.send :handle_close end end diff --git a/actioncable/test/connection/multiple_identifiers_test.rb b/actioncable/test/connection/multiple_identifiers_test.rb index 55a9f96cb3..e9bb4e6d7f 100644 --- a/actioncable/test/connection/multiple_identifiers_test.rb +++ b/actioncable/test/connection/multiple_identifiers_test.rb @@ -32,10 +32,10 @@ class ActionCable::Connection::MultipleIdentifiersTest < ActionCable::TestCase @connection = Connection.new(server, env) @connection.process - @connection.send :on_open + @connection.send :handle_open end def close_connection - @connection.send :on_close + @connection.send :handle_close end end diff --git a/actioncable/test/stubs/test_server.rb b/actioncable/test/stubs/test_server.rb index 6e6541a952..56d132b30a 100644 --- a/actioncable/test/stubs/test_server.rb +++ b/actioncable/test/stubs/test_server.rb @@ -14,6 +14,7 @@ class TestServer @config.subscription_adapter.new(self) end - def send_async + def stream_event_loop + @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new end end diff --git a/actioncable/test/subscription_adapter/common.rb b/actioncable/test/subscription_adapter/common.rb index d4a13be889..361858784e 100644 --- a/actioncable/test/subscription_adapter/common.rb +++ b/actioncable/test/subscription_adapter/common.rb @@ -1,7 +1,6 @@ require 'test_helper' require 'concurrent' -require 'action_cable/process/logging' require 'active_support/core_ext/hash/indifferent_access' require 'pathname' @@ -24,8 +23,6 @@ module CommonSubscriptionAdapterTest # and now the "real" setup for our test: - spawn_eventmachine - server.config.cable = cable_config.with_indifferent_access adapter_klass = server.config.pubsub_adapter diff --git a/actioncable/test/test_helper.rb b/actioncable/test/test_helper.rb index 6636ce078b..8ddbd4e764 100644 --- a/actioncable/test/test_helper.rb +++ b/actioncable/test/test_helper.rb @@ -13,28 +13,16 @@ require 'rack/mock' # Require all the stubs and models Dir[File.dirname(__FILE__) + '/stubs/*.rb'].each {|file| require file } -require 'faye/websocket' -class << Faye::WebSocket - remove_method :ensure_reactor_running - - # We don't want Faye to start the EM reactor in tests because it makes testing much harder. - # We want to be able to start and stop EM loop in tests to make things simpler. - def ensure_reactor_running - # no-op - end -end - class ActionCable::TestCase < ActiveSupport::TestCase - def run_in_eventmachine - EM.run do - yield - - EM.run_deferred_callbacks - EM.stop + def wait_for_async + e = Concurrent.global_io_executor + until e.completed_task_count == e.scheduled_task_count + sleep 0.1 end end - def spawn_eventmachine - Thread.new { EventMachine.run } unless EventMachine.reactor_running? + def run_in_eventmachine + yield + wait_for_async end end |