diff options
Diffstat (limited to 'actioncable')
26 files changed, 103 insertions, 387 deletions
diff --git a/actioncable/README.md b/actioncable/README.md index 63e328321b..cad71ddf94 100644 --- a/actioncable/README.md +++ b/actioncable/README.md @@ -443,11 +443,10 @@ The Ruby side of things is built on top of [faye-websocket](https://github.com/f ## Deployment -Action Cable is powered by a combination of EventMachine and threads. The -framework plumbing needed for connection handling is handled in the -EventMachine loop, but the actual channel, user-specified, work is handled -in a normal Ruby thread. This means you can use all your regular Rails models -with no problem, as long as you haven't committed any thread-safety sins. +Action Cable is powered by a combination of websockets and threads. All of the +connection management is handled internally by utilizing Ruby’s native thread +support, which means you can use all your regular Rails models with no problems +as long as you haven’t committed any thread-safety sins. But this also means that Action Cable needs to run in its own server process. So you'll have one set of server processes for your normal web work, and another diff --git a/actioncable/actioncable.gemspec b/actioncable/actioncable.gemspec index 14f968f1ef..a36acc8f6f 100644 --- a/actioncable/actioncable.gemspec +++ b/actioncable/actioncable.gemspec @@ -21,7 +21,8 @@ Gem::Specification.new do |s| s.add_dependency 'actionpack', version s.add_dependency 'coffee-rails', '~> 4.1.0' - s.add_dependency 'nio4r', '~> 1.2' + s.add_dependency 'eventmachine', '~> 1.0' + s.add_dependency 'faye-websocket', '~> 0.10.0' s.add_dependency 'websocket-driver', '~> 0.6.1' s.add_development_dependency 'em-hiredis', '~> 0.3.0' diff --git a/actioncable/lib/action_cable/channel/periodic_timers.rb b/actioncable/lib/action_cable/channel/periodic_timers.rb index 56597d02d7..7f0fb37afc 100644 --- a/actioncable/lib/action_cable/channel/periodic_timers.rb +++ b/actioncable/lib/action_cable/channel/periodic_timers.rb @@ -27,14 +27,14 @@ module ActionCable def start_periodic_timers self.class.periodic_timers.each do |callback, options| - active_periodic_timers << Concurrent::TimerTask.new(execution_interval: options[:every]) do + active_periodic_timers << EventMachine::PeriodicTimer.new(options[:every]) do connection.worker_pool.async_run_periodic_timer(self, callback) end end end def stop_periodic_timers - active_periodic_timers.each { |timer| timer.shutdown } + active_periodic_timers.each { |timer| timer.cancel } end end end diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index a26373e387..e2876ef6fa 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -75,7 +75,7 @@ module ActionCable callback ||= default_stream_callback(broadcasting) streams << [ broadcasting, callback ] - Concurrent.global_io_executor.post do + EM.next_tick do pubsub.subscribe(broadcasting, callback, lambda do transmit_subscription_confirmation logger.info "#{self.class.name} is streaming from #{broadcasting}" diff --git a/actioncable/lib/action_cable/connection.rb b/actioncable/lib/action_cable/connection.rb index 902efb07e2..b672e00682 100644 --- a/actioncable/lib/action_cable/connection.rb +++ b/actioncable/lib/action_cable/connection.rb @@ -5,15 +5,12 @@ module ActionCable eager_autoload do autoload :Authorization autoload :Base - autoload :ClientSocket autoload :Identification autoload :InternalChannel autoload :MessageBuffer - autoload :Stream - autoload :StreamEventLoop + autoload :WebSocket autoload :Subscriptions autoload :TaggedLoggerProxy - autoload :WebSocket end end end diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb index 0016d1a1a4..bb8850aaa0 100644 --- a/actioncable/lib/action_cable/connection/base.rb +++ b/actioncable/lib/action_cable/connection/base.rb @@ -49,14 +49,14 @@ module ActionCable include Authorization attr_reader :server, :env, :subscriptions, :logger - delegate :stream_event_loop, :worker_pool, :pubsub, to: :server + delegate :worker_pool, :pubsub, to: :server def initialize(server, env) @server, @env = server, env @logger = new_tagged_logger - @websocket = ActionCable::Connection::WebSocket.new(env, self, stream_event_loop) + @websocket = ActionCable::Connection::WebSocket.new(env) @subscriptions = ActionCable::Connection::Subscriptions.new(self) @message_buffer = ActionCable::Connection::MessageBuffer.new(self) @@ -70,6 +70,10 @@ module ActionCable logger.info started_request_message if websocket.possible? && allow_request_origin? + websocket.on(:open) { |event| send_async :on_open } + websocket.on(:message) { |event| on_message event.data } + websocket.on(:close) { |event| send_async :on_close } + respond_to_successful_request else respond_to_invalid_request @@ -117,22 +121,6 @@ module ActionCable transmit ActiveSupport::JSON.encode(identifier: ActionCable::INTERNAL[:identifiers][:ping], message: Time.now.to_i) end - def on_open # :nodoc: - send_async :handle_open - end - - def on_message(message) # :nodoc: - message_buffer.append message - end - - def on_error(message) # :nodoc: - # ignore - end - - def on_close # :nodoc: - send_async :handle_close - end - protected # The request that initiated the WebSocket connection is available here. This gives access to the environment, cookies, etc. def request @@ -151,7 +139,7 @@ module ActionCable attr_reader :message_buffer private - def handle_open + def on_open connect if respond_to?(:connect) subscribe_to_internal_channel beat @@ -162,7 +150,11 @@ module ActionCable respond_to_invalid_request end - def handle_close + def on_message(message) + message_buffer.append message + end + + def on_close logger.info finished_request_message server.remove_connection(self) diff --git a/actioncable/lib/action_cable/connection/client_socket.rb b/actioncable/lib/action_cable/connection/client_socket.rb deleted file mode 100644 index 62dd753646..0000000000 --- a/actioncable/lib/action_cable/connection/client_socket.rb +++ /dev/null @@ -1,152 +0,0 @@ -require 'websocket/driver' - -module ActionCable - module Connection - #-- - # This class is heavily based on faye-websocket-ruby - # - # Copyright (c) 2010-2015 James Coglan - class ClientSocket # :nodoc: - def self.determine_url(env) - scheme = secure_request?(env) ? 'wss:' : 'ws:' - "#{ scheme }//#{ env['HTTP_HOST'] }#{ env['REQUEST_URI'] }" - end - - def self.secure_request?(env) - return true if env['HTTPS'] == 'on' - return true if env['HTTP_X_FORWARDED_SSL'] == 'on' - return true if env['HTTP_X_FORWARDED_SCHEME'] == 'https' - return true if env['HTTP_X_FORWARDED_PROTO'] == 'https' - return true if env['rack.url_scheme'] == 'https' - - return false - end - - CONNECTING = 0 - OPEN = 1 - CLOSING = 2 - CLOSED = 3 - - attr_reader :env, :url - - def initialize(env, event_target, stream_event_loop) - @env = env - @event_target = event_target - @stream_event_loop = stream_event_loop - - @url = ClientSocket.determine_url(@env) - - @driver = @driver_started = nil - - @ready_state = CONNECTING - - # The driver calls +env+, +url+, and +write+ - @driver = ::WebSocket::Driver.rack(self) - - @driver.on(:open) { |e| open } - @driver.on(:message) { |e| receive_message(e.data) } - @driver.on(:close) { |e| begin_close(e.reason, e.code) } - @driver.on(:error) { |e| emit_error(e.message) } - - @stream = ActionCable::Connection::Stream.new(@stream_event_loop, self) - - if callback = @env['async.callback'] - callback.call([101, {}, @stream]) - end - end - - def start_driver - return if @driver.nil? || @driver_started - @driver_started = true - @driver.start - end - - def rack_response - start_driver - [ -1, {}, [] ] - end - - def write(data) - @stream.write(data) - end - - def transmit(message) - return false if @ready_state > OPEN - case message - when Numeric then @driver.text(message.to_s) - when String then @driver.text(message) - when Array then @driver.binary(message) - else false - end - end - - def close(code = nil, reason = nil) - code ||= 1000 - reason ||= '' - - unless code == 1000 or (code >= 3000 and code <= 4999) - raise ArgumentError, "Failed to execute 'close' on WebSocket: " + - "The code must be either 1000, or between 3000 and 4999. " + - "#{code} is neither." - end - - @ready_state = CLOSING unless @ready_state == CLOSED - @driver.close(reason, code) - end - - def parse(data) - @driver.parse(data) - end - - def client_gone - finalize_close - end - - def alive? - @ready_state == OPEN - end - - private - def open - return unless @ready_state == CONNECTING - @ready_state = OPEN - - @event_target.on_open - end - - def receive_message(data) - return unless @ready_state == OPEN - - @event_target.on_message(data) - end - - def emit_error(message) - return if @ready_state >= CLOSING - - @event_target.on_error(message) - end - - def begin_close(reason, code) - return if @ready_state == CLOSED - @ready_state = CLOSING - @close_params = [reason, code] - - if @stream - @stream.shutdown - else - finalize_close - end - end - - def finalize_close - return if @ready_state == CLOSED - @ready_state = CLOSED - - reason = @close_params ? @close_params[0] : '' - code = @close_params ? @close_params[1] : 1006 - - @event_target.on_close(code, reason) - end - end - end -end diff --git a/actioncable/lib/action_cable/connection/internal_channel.rb b/actioncable/lib/action_cable/connection/internal_channel.rb index 27826792b3..54ed7672d2 100644 --- a/actioncable/lib/action_cable/connection/internal_channel.rb +++ b/actioncable/lib/action_cable/connection/internal_channel.rb @@ -15,14 +15,14 @@ module ActionCable @_internal_subscriptions ||= [] @_internal_subscriptions << [ internal_channel, callback ] - Concurrent.global_io_executor.post { pubsub.subscribe(internal_channel, callback) } + EM.next_tick { pubsub.subscribe(internal_channel, callback) } logger.info "Registered connection (#{connection_identifier})" end end def unsubscribe_from_internal_channel if @_internal_subscriptions.present? - @_internal_subscriptions.each { |channel, callback| Concurrent.global_io_executor.post { pubsub.unsubscribe(channel, callback) } } + @_internal_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe(channel, callback) } } end end diff --git a/actioncable/lib/action_cable/connection/stream.rb b/actioncable/lib/action_cable/connection/stream.rb deleted file mode 100644 index ace250cd16..0000000000 --- a/actioncable/lib/action_cable/connection/stream.rb +++ /dev/null @@ -1,59 +0,0 @@ -module ActionCable - module Connection - #-- - # This class is heavily based on faye-websocket-ruby - # - # Copyright (c) 2010-2015 James Coglan - class Stream - def initialize(event_loop, socket) - @event_loop = event_loop - @socket_object = socket - @stream_send = socket.env['stream.send'] - - @rack_hijack_io = nil - - hijack_rack_socket - end - - def each(&callback) - @stream_send ||= callback - end - - def close - shutdown - @socket_object.client_gone - end - - def shutdown - clean_rack_hijack - end - - def write(data) - return @rack_hijack_io.write(data) if @rack_hijack_io - return @stream_send.call(data) if @stream_send - rescue EOFError - @socket_object.client_gone - end - - def receive(data) - @socket_object.parse(data) - end - - private - def hijack_rack_socket - return unless @socket_object.env['rack.hijack'] - - @socket_object.env['rack.hijack'].call - @rack_hijack_io = @socket_object.env['rack.hijack_io'] - - @event_loop.attach(@rack_hijack_io, self) - end - - def clean_rack_hijack - return unless @rack_hijack_io - @event_loop.detach(@rack_hijack_io, self) - @rack_hijack_io = nil - end - end - end -end diff --git a/actioncable/lib/action_cable/connection/stream_event_loop.rb b/actioncable/lib/action_cable/connection/stream_event_loop.rb deleted file mode 100644 index f773814973..0000000000 --- a/actioncable/lib/action_cable/connection/stream_event_loop.rb +++ /dev/null @@ -1,68 +0,0 @@ -require 'nio' - -module ActionCable - module Connection - class StreamEventLoop - def initialize - @nio = NIO::Selector.new - @map = {} - @stopping = false - @todo = Queue.new - - Thread.new do - Thread.current.abort_on_exception = true - run - end - end - - def attach(io, stream) - @todo << lambda do - @map[io] = stream - @nio.register(io, :r) - end - @nio.wakeup - end - - def detach(io, stream) - @todo << lambda do - @nio.deregister(io) - @map.delete io - end - @nio.wakeup - end - - def stop - @stopping = true - @nio.wakeup - end - - def run - loop do - if @stopping - @nio.close - break - end - - until @todo.empty? - @todo.pop(true).call - end - - if monitors = @nio.select - monitors.each do |monitor| - io = monitor.io - stream = @map[io] - - begin - stream.receive io.read_nonblock(4096) - rescue IO::WaitReadable - next - rescue EOFError - stream.close - end - end - end - end - end - end - end -end diff --git a/actioncable/lib/action_cable/connection/web_socket.rb b/actioncable/lib/action_cable/connection/web_socket.rb index 5e89fb9b72..670d5690ae 100644 --- a/actioncable/lib/action_cable/connection/web_socket.rb +++ b/actioncable/lib/action_cable/connection/web_socket.rb @@ -1,11 +1,13 @@ -require 'websocket/driver' +require 'faye/websocket' module ActionCable module Connection - # Wrap the real socket to minimize the externally-presented API + # Decorate the Faye::WebSocket with helpers we need. class WebSocket - def initialize(env, event_target, stream_event_loop) - @websocket = ::WebSocket::Driver.websocket?(env) ? ClientSocket.new(env, event_target, stream_event_loop) : nil + delegate :rack_response, :close, :on, to: :websocket + + def initialize(env) + @websocket = Faye::WebSocket.websocket?(env) ? Faye::WebSocket.new(env) : nil end def possible? @@ -13,19 +15,11 @@ module ActionCable end def alive? - websocket && websocket.alive? + websocket && websocket.ready_state == Faye::WebSocket::API::OPEN end def transmit(data) - websocket.transmit data - end - - def close - websocket.close - end - - def rack_response - websocket.rack_response + websocket.send data end protected diff --git a/actioncable/lib/action_cable/process/logging.rb b/actioncable/lib/action_cable/process/logging.rb new file mode 100644 index 0000000000..dce637b3ca --- /dev/null +++ b/actioncable/lib/action_cable/process/logging.rb @@ -0,0 +1,7 @@ +require 'action_cable/server' +require 'eventmachine' + +EM.error_handler do |e| + puts "Error raised inside the event loop: #{e.message}" + puts e.backtrace.join("\n") +end diff --git a/actioncable/lib/action_cable/server.rb b/actioncable/lib/action_cable/server.rb index bd6a3826a3..a2a89d5f1e 100644 --- a/actioncable/lib/action_cable/server.rb +++ b/actioncable/lib/action_cable/server.rb @@ -1,3 +1,7 @@ +require 'eventmachine' +EventMachine.epoll if EventMachine.epoll? +EventMachine.kqueue if EventMachine.kqueue? + module ActionCable module Server extend ActiveSupport::Autoload diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb index b00abd208c..3385a4c9f3 100644 --- a/actioncable/lib/action_cable/server/base.rb +++ b/actioncable/lib/action_cable/server/base.rb @@ -32,10 +32,6 @@ module ActionCable @remote_connections ||= RemoteConnections.new(self) end - def stream_event_loop - @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new - end - # The thread worker pool for handling all the connection work on this server. Default size is set by config.worker_pool_size. def worker_pool @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size) diff --git a/actioncable/lib/action_cable/server/connections.rb b/actioncable/lib/action_cable/server/connections.rb index 8671dd5ebd..47dcea8c20 100644 --- a/actioncable/lib/action_cable/server/connections.rb +++ b/actioncable/lib/action_cable/server/connections.rb @@ -22,9 +22,11 @@ module ActionCable # then can't rely on being able to receive and send to it. So there's a 3 second heartbeat running on all connections. If the beat fails, we automatically # disconnect. def setup_heartbeat_timer - @heartbeat_timer ||= Concurrent::TimerTask.new(execution_interval: BEAT_INTERVAL) do - Concurrent.global_io_executor.post { connections.map(&:beat) } - end.tap(&:execute) + EM.next_tick do + @heartbeat_timer ||= EventMachine.add_periodic_timer(BEAT_INTERVAL) do + EM.next_tick { connections.map(&:beat) } + end + end end def open_connections_statistics diff --git a/actioncable/lib/action_cable/subscription_adapter/async.rb b/actioncable/lib/action_cable/subscription_adapter/async.rb index c88b03947a..85d4892e4c 100644 --- a/actioncable/lib/action_cable/subscription_adapter/async.rb +++ b/actioncable/lib/action_cable/subscription_adapter/async.rb @@ -10,11 +10,11 @@ module ActionCable class AsyncSubscriberMap < SubscriberMap def add_subscriber(*) - Concurrent.global_io_executor.post { super } + ::EM.next_tick { super } end def invoke_callback(*) - Concurrent.global_io_executor.post { super } + ::EM.next_tick { super } end end end diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb index 3ce1bbed68..78f8aeb599 100644 --- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb +++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb @@ -63,7 +63,7 @@ module ActionCable case action when :listen pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}") - Concurrent.global_io_executor << callback if callback + ::EM.next_tick(&callback) if callback when :unlisten pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}") when :shutdown @@ -93,7 +93,7 @@ module ActionCable end def invoke_callback(*) - Concurrent.global_io_executor.post { super } + ::EM.next_tick { super } end end end diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb index a035e3988d..3b86354621 100644 --- a/actioncable/lib/action_cable/subscription_adapter/redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb @@ -1,18 +1,11 @@ -require 'thread' - gem 'em-hiredis', '~> 0.3.0' gem 'redis', '~> 3.0' require 'em-hiredis' require 'redis' -EventMachine.epoll if EventMachine.epoll? -EventMachine.kqueue if EventMachine.kqueue? - module ActionCable module SubscriptionAdapter class Redis < Base # :nodoc: - @@mutex = Mutex.new - def broadcast(channel, payload) redis_connection_for_broadcasts.publish(channel, payload) end @@ -34,7 +27,6 @@ module ActionCable private def redis_connection_for_subscriptions - ensure_reactor_running @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis| redis.on(:reconnect_failed) do @logger.info "[ActionCable] Redis reconnect failed." @@ -45,14 +37,6 @@ module ActionCable def redis_connection_for_broadcasts @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable) end - - def ensure_reactor_running - return if EventMachine.reactor_running? - @@mutex.synchronize do - Thread.new { EventMachine.run } unless EventMachine.reactor_running? - Thread.pass until EventMachine.reactor_running? - end - end end end end diff --git a/actioncable/test/channel/periodic_timers_test.rb b/actioncable/test/channel/periodic_timers_test.rb index 64f0247cd6..1590a12f09 100644 --- a/actioncable/test/channel/periodic_timers_test.rb +++ b/actioncable/test/channel/periodic_timers_test.rb @@ -31,7 +31,7 @@ class ActionCable::Channel::PeriodicTimersTest < ActiveSupport::TestCase end test "timer start and stop" do - Concurrent::TimerTask.expects(:new).times(2).returns(true) + EventMachine::PeriodicTimer.expects(:new).times(2).returns(true) channel = ChatChannel.new @connection, "{id: 1}", { id: 1 } channel.expects(:stop_periodic_timers).once diff --git a/actioncable/test/channel/stream_test.rb b/actioncable/test/channel/stream_test.rb index 947efd96d4..3fa2b291b7 100644 --- a/actioncable/test/channel/stream_test.rb +++ b/actioncable/test/channel/stream_test.rb @@ -31,7 +31,9 @@ class ActionCable::Channel::StreamTest < ActionCable::TestCase test "stream_for" do run_in_eventmachine do connection = TestConnection.new - connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } + EM.next_tick do + connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } + end channel = ChatChannel.new connection, "" channel.stream_for Room.new(1) @@ -39,35 +41,39 @@ class ActionCable::Channel::StreamTest < ActionCable::TestCase end test "stream_from subscription confirmation" do - run_in_eventmachine do + EM.run do connection = TestConnection.new ChatChannel.new connection, "{id: 1}", { id: 1 } assert_nil connection.last_transmission - wait_for_async + EM::Timer.new(0.1) do + expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "confirm_subscription" + connection.transmit(expected) - expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "confirm_subscription" - connection.transmit(expected) + assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation within 0.1s" - assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation within 0.1s" + EM.run_deferred_callbacks + EM.stop + end end end test "subscription confirmation should only be sent out once" do - run_in_eventmachine do + EM.run do connection = TestConnection.new channel = ChatChannel.new connection, "test_channel" channel.send_confirmation channel.send_confirmation - wait_for_async + EM.run_deferred_callbacks expected = ActiveSupport::JSON.encode "identifier" => "test_channel", "type" => "confirm_subscription" assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation" assert_equal 1, connection.transmissions.size + EM.stop end end diff --git a/actioncable/test/connection/base_test.rb b/actioncable/test/connection/base_test.rb index e2b017a9a1..182562db82 100644 --- a/actioncable/test/connection/base_test.rb +++ b/actioncable/test/connection/base_test.rb @@ -37,8 +37,6 @@ class ActionCable::Connection::BaseTest < ActionCable::TestCase connection.process assert connection.websocket.possible? - - wait_for_async assert connection.websocket.alive? end end @@ -55,15 +53,16 @@ class ActionCable::Connection::BaseTest < ActionCable::TestCase test "on connection open" do run_in_eventmachine do connection = open_connection + connection.process connection.websocket.expects(:transmit).with(regexp_matches(/\_ping/)) connection.message_buffer.expects(:process!) - connection.process - wait_for_async - - assert_equal [ connection ], @server.connections - assert connection.connected + # Allow EM to run on_open callback + EM.next_tick do + assert_equal [ connection ], @server.connections + assert connection.connected + end end end @@ -73,12 +72,12 @@ class ActionCable::Connection::BaseTest < ActionCable::TestCase connection.process # Setup the connection - Concurrent::TimerTask.stubs(:new).returns(true) - connection.send :handle_open + EventMachine.stubs(:add_periodic_timer).returns(true) + connection.send :on_open assert connection.connected connection.subscriptions.expects(:unsubscribe_from_all) - connection.send :handle_close + connection.send :on_close assert ! connection.connected assert_equal [], @server.connections diff --git a/actioncable/test/connection/identifier_test.rb b/actioncable/test/connection/identifier_test.rb index 1019ad541e..a110dfdee0 100644 --- a/actioncable/test/connection/identifier_test.rb +++ b/actioncable/test/connection/identifier_test.rb @@ -68,10 +68,10 @@ class ActionCable::Connection::IdentifierTest < ActionCable::TestCase @connection = Connection.new(server, env) @connection.process - @connection.send :handle_open + @connection.send :on_open end def close_connection - @connection.send :handle_close + @connection.send :on_close end end diff --git a/actioncable/test/connection/multiple_identifiers_test.rb b/actioncable/test/connection/multiple_identifiers_test.rb index e9bb4e6d7f..55a9f96cb3 100644 --- a/actioncable/test/connection/multiple_identifiers_test.rb +++ b/actioncable/test/connection/multiple_identifiers_test.rb @@ -32,10 +32,10 @@ class ActionCable::Connection::MultipleIdentifiersTest < ActionCable::TestCase @connection = Connection.new(server, env) @connection.process - @connection.send :handle_open + @connection.send :on_open end def close_connection - @connection.send :handle_close + @connection.send :on_close end end diff --git a/actioncable/test/stubs/test_server.rb b/actioncable/test/stubs/test_server.rb index 56d132b30a..6e6541a952 100644 --- a/actioncable/test/stubs/test_server.rb +++ b/actioncable/test/stubs/test_server.rb @@ -14,7 +14,6 @@ class TestServer @config.subscription_adapter.new(self) end - def stream_event_loop - @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new + def send_async end end diff --git a/actioncable/test/subscription_adapter/common.rb b/actioncable/test/subscription_adapter/common.rb index 361858784e..d4a13be889 100644 --- a/actioncable/test/subscription_adapter/common.rb +++ b/actioncable/test/subscription_adapter/common.rb @@ -1,6 +1,7 @@ require 'test_helper' require 'concurrent' +require 'action_cable/process/logging' require 'active_support/core_ext/hash/indifferent_access' require 'pathname' @@ -23,6 +24,8 @@ module CommonSubscriptionAdapterTest # and now the "real" setup for our test: + spawn_eventmachine + server.config.cable = cable_config.with_indifferent_access adapter_klass = server.config.pubsub_adapter diff --git a/actioncable/test/test_helper.rb b/actioncable/test/test_helper.rb index 8ddbd4e764..6636ce078b 100644 --- a/actioncable/test/test_helper.rb +++ b/actioncable/test/test_helper.rb @@ -13,16 +13,28 @@ require 'rack/mock' # Require all the stubs and models Dir[File.dirname(__FILE__) + '/stubs/*.rb'].each {|file| require file } +require 'faye/websocket' +class << Faye::WebSocket + remove_method :ensure_reactor_running + + # We don't want Faye to start the EM reactor in tests because it makes testing much harder. + # We want to be able to start and stop EM loop in tests to make things simpler. + def ensure_reactor_running + # no-op + end +end + class ActionCable::TestCase < ActiveSupport::TestCase - def wait_for_async - e = Concurrent.global_io_executor - until e.completed_task_count == e.scheduled_task_count - sleep 0.1 + def run_in_eventmachine + EM.run do + yield + + EM.run_deferred_callbacks + EM.stop end end - def run_in_eventmachine - yield - wait_for_async + def spawn_eventmachine + Thread.new { EventMachine.run } unless EventMachine.reactor_running? end end |