diff options
Diffstat (limited to 'actioncable')
29 files changed, 227 insertions, 51 deletions
diff --git a/actioncable/lib/action_cable/channel/periodic_timers.rb b/actioncable/lib/action_cable/channel/periodic_timers.rb index 0f6e854520..b414255707 100644 --- a/actioncable/lib/action_cable/channel/periodic_timers.rb +++ b/actioncable/lib/action_cable/channel/periodic_timers.rb @@ -27,7 +27,7 @@ module ActionCable def start_periodic_timers self.class.periodic_timers.each do |callback, options| - active_periodic_timers << Concurrent::TimerTask.new(execution_interval: options[:every]) do + active_periodic_timers << connection.server.event_loop.timer(options[:every]) do connection.worker_pool.async_run_periodic_timer(self, callback) end end diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index 431a5c1063..23d7320a28 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -79,7 +79,7 @@ module ActionCable callback ||= default_stream_callback(broadcasting) streams << [ broadcasting, callback ] - Concurrent.global_io_executor.post do + connection.server.event_loop.post do pubsub.subscribe(broadcasting, callback, lambda do transmit_subscription_confirmation logger.info "#{self.class.name} is streaming from #{broadcasting}" diff --git a/actioncable/lib/action_cable/connection.rb b/actioncable/lib/action_cable/connection.rb index 902efb07e2..5f813cf8e0 100644 --- a/actioncable/lib/action_cable/connection.rb +++ b/actioncable/lib/action_cable/connection.rb @@ -8,6 +8,8 @@ module ActionCable autoload :ClientSocket autoload :Identification autoload :InternalChannel + autoload :FayeClientSocket + autoload :FayeEventLoop autoload :MessageBuffer autoload :Stream autoload :StreamEventLoop diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb index afe0d958d7..7e7e777eaa 100644 --- a/actioncable/lib/action_cable/connection/base.rb +++ b/actioncable/lib/action_cable/connection/base.rb @@ -49,7 +49,7 @@ module ActionCable include Authorization attr_reader :server, :env, :subscriptions, :logger, :worker_pool - delegate :stream_event_loop, :pubsub, to: :server + delegate :event_loop, :pubsub, to: :server def initialize(server, env) @server, @env = server, env @@ -57,7 +57,7 @@ module ActionCable @worker_pool = server.worker_pool @logger = new_tagged_logger - @websocket = ActionCable::Connection::WebSocket.new(env, self, stream_event_loop) + @websocket = ActionCable::Connection::WebSocket.new(env, self, event_loop, server.config.client_socket_class) @subscriptions = ActionCable::Connection::Subscriptions.new(self) @message_buffer = ActionCable::Connection::MessageBuffer.new(self) diff --git a/actioncable/lib/action_cable/connection/client_socket.rb b/actioncable/lib/action_cable/connection/client_socket.rb index f6b11e93f0..9e4dbcd6e6 100644 --- a/actioncable/lib/action_cable/connection/client_socket.rb +++ b/actioncable/lib/action_cable/connection/client_socket.rb @@ -29,10 +29,10 @@ module ActionCable attr_reader :env, :url - def initialize(env, event_target, stream_event_loop) - @env = env - @event_target = event_target - @stream_event_loop = stream_event_loop + def initialize(env, event_target, event_loop) + @env = env + @event_target = event_target + @event_loop = event_loop @url = ClientSocket.determine_url(@env) @@ -49,7 +49,7 @@ module ActionCable @driver.on(:close) { |e| begin_close(e.reason, e.code) } @driver.on(:error) { |e| emit_error(e.message) } - @stream = ActionCable::Connection::Stream.new(@stream_event_loop, self) + @stream = ActionCable::Connection::Stream.new(@event_loop, self) end def start_driver diff --git a/actioncable/lib/action_cable/connection/faye_client_socket.rb b/actioncable/lib/action_cable/connection/faye_client_socket.rb new file mode 100644 index 0000000000..c9139b6858 --- /dev/null +++ b/actioncable/lib/action_cable/connection/faye_client_socket.rb @@ -0,0 +1,42 @@ +require 'faye/websocket' + +module ActionCable + module Connection + class FayeClientSocket + def initialize(env, event_target, stream_event_loop) + @env = env + @event_target = event_target + + @faye = nil + end + + def alive? + @faye && @faye.ready_state == Faye::WebSocket::API::OPEN + end + + def transmit(data) + connect + @faye.send data + end + + def close + @faye && @faye.close + end + + def rack_response + connect + @faye.rack_response + end + + private + def connect + return if @faye + @faye = Faye::WebSocket.new(@env) + + @faye.on(:open) { |event| @event_target.on_open } + @faye.on(:message) { |event| @event_target.on_message(event.data) } + @faye.on(:close) { |event| @event_target.on_close(event.reason, event.code) } + end + end + end +end diff --git a/actioncable/lib/action_cable/connection/faye_event_loop.rb b/actioncable/lib/action_cable/connection/faye_event_loop.rb new file mode 100644 index 0000000000..8b70f3d84e --- /dev/null +++ b/actioncable/lib/action_cable/connection/faye_event_loop.rb @@ -0,0 +1,44 @@ +require 'thread' + +require 'eventmachine' +EventMachine.epoll if EventMachine.epoll? +EventMachine.kqueue if EventMachine.kqueue? + +module ActionCable + module Connection + class FayeEventLoop + @@mutex = Mutex.new + + def timer(interval, &block) + ensure_reactor_running + EMTimer.new(::EM::PeriodicTimer.new(interval, &block)) + end + + def post(task = nil, &block) + task ||= block + + ensure_reactor_running + ::EM.next_tick(&task) + end + + private + def ensure_reactor_running + return if EventMachine.reactor_running? + @@mutex.synchronize do + Thread.new { EventMachine.run } unless EventMachine.reactor_running? + Thread.pass until EventMachine.reactor_running? + end + end + + class EMTimer + def initialize(inner) + @inner = inner + end + + def shutdown + inner.cancel + end + end + end + end +end diff --git a/actioncable/lib/action_cable/connection/internal_channel.rb b/actioncable/lib/action_cable/connection/internal_channel.rb index 27826792b3..3c5d39f59a 100644 --- a/actioncable/lib/action_cable/connection/internal_channel.rb +++ b/actioncable/lib/action_cable/connection/internal_channel.rb @@ -15,14 +15,14 @@ module ActionCable @_internal_subscriptions ||= [] @_internal_subscriptions << [ internal_channel, callback ] - Concurrent.global_io_executor.post { pubsub.subscribe(internal_channel, callback) } + server.event_loop.post { pubsub.subscribe(internal_channel, callback) } logger.info "Registered connection (#{connection_identifier})" end end def unsubscribe_from_internal_channel if @_internal_subscriptions.present? - @_internal_subscriptions.each { |channel, callback| Concurrent.global_io_executor.post { pubsub.unsubscribe(channel, callback) } } + @_internal_subscriptions.each { |channel, callback| server.event_loop.post { pubsub.unsubscribe(channel, callback) } } end end diff --git a/actioncable/lib/action_cable/connection/stream_event_loop.rb b/actioncable/lib/action_cable/connection/stream_event_loop.rb index e6335082d2..2abad09c03 100644 --- a/actioncable/lib/action_cable/connection/stream_event_loop.rb +++ b/actioncable/lib/action_cable/connection/stream_event_loop.rb @@ -11,7 +11,16 @@ module ActionCable @todo = Queue.new @spawn_mutex = Mutex.new - spawn + end + + def timer(interval, &block) + Concurrent::TimerTask.new(execution_interval: interval, &block).tap(&:execute) + end + + def post(task = nil, &block) + task ||= block + + Concurrent.global_io_executor << task end def attach(io, stream) diff --git a/actioncable/lib/action_cable/connection/web_socket.rb b/actioncable/lib/action_cable/connection/web_socket.rb index 5e89fb9b72..0bec9b6a96 100644 --- a/actioncable/lib/action_cable/connection/web_socket.rb +++ b/actioncable/lib/action_cable/connection/web_socket.rb @@ -4,8 +4,8 @@ module ActionCable module Connection # Wrap the real socket to minimize the externally-presented API class WebSocket - def initialize(env, event_target, stream_event_loop) - @websocket = ::WebSocket::Driver.websocket?(env) ? ClientSocket.new(env, event_target, stream_event_loop) : nil + def initialize(env, event_target, event_loop, client_socket_class) + @websocket = ::WebSocket::Driver.websocket?(env) ? client_socket_class.new(env, event_target, event_loop) : nil end def possible? diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb index d9a2653cc2..778f5ffeed 100644 --- a/actioncable/lib/action_cable/server/base.rb +++ b/actioncable/lib/action_cable/server/base.rb @@ -1,4 +1,4 @@ -require 'thread' +require 'monitor' module ActionCable module Server @@ -18,8 +18,8 @@ module ActionCable attr_reader :mutex def initialize - @mutex = Mutex.new - @remote_connections = @stream_event_loop = @worker_pool = @channel_classes = @pubsub = nil + @mutex = Monitor.new + @remote_connections = @event_loop = @worker_pool = @channel_classes = @pubsub = nil end # Called by Rack to setup the server. @@ -48,8 +48,8 @@ module ActionCable @remote_connections || @mutex.synchronize { @remote_connections ||= RemoteConnections.new(self) } end - def stream_event_loop - @stream_event_loop || @mutex.synchronize { @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new } + def event_loop + @event_loop || @mutex.synchronize { @event_loop ||= config.event_loop_class.new } end # The thread worker pool for handling all the connection work on this server. Default size is set by config.worker_pool_size. diff --git a/actioncable/lib/action_cable/server/configuration.rb b/actioncable/lib/action_cable/server/configuration.rb index 9a7301287c..5fe71caed2 100644 --- a/actioncable/lib/action_cable/server/configuration.rb +++ b/actioncable/lib/action_cable/server/configuration.rb @@ -4,7 +4,7 @@ module ActionCable # in a Rails config initializer. class Configuration attr_accessor :logger, :log_tags - attr_accessor :connection_class, :worker_pool_size + attr_accessor :use_faye, :connection_class, :worker_pool_size attr_accessor :disable_request_forgery_protection, :allowed_request_origins attr_accessor :cable, :url, :mount_path @@ -43,6 +43,22 @@ module ActionCable adapter = 'PostgreSQL' if adapter == 'Postgresql' "ActionCable::SubscriptionAdapter::#{adapter}".constantize end + + def event_loop_class + if use_faye + ActionCable::Connection::FayeEventLoop + else + ActionCable::Connection::StreamEventLoop + end + end + + def client_socket_class + if use_faye + ActionCable::Connection::FayeClientSocket + else + ActionCable::Connection::ClientSocket + end + end end end end diff --git a/actioncable/lib/action_cable/server/connections.rb b/actioncable/lib/action_cable/server/connections.rb index 4dc8934b25..5e61b4e335 100644 --- a/actioncable/lib/action_cable/server/connections.rb +++ b/actioncable/lib/action_cable/server/connections.rb @@ -21,9 +21,9 @@ module ActionCable # then can't rely on being able to communicate with the connection. To solve this, a 3 second heartbeat runs on all connections. If the beat fails, we automatically # disconnect. def setup_heartbeat_timer - @heartbeat_timer ||= Concurrent::TimerTask.new(execution_interval: BEAT_INTERVAL) do - Concurrent.global_io_executor.post { connections.map(&:beat) } - end.tap(&:execute) + @heartbeat_timer ||= event_loop.timer(BEAT_INTERVAL) do + event_loop.post { connections.map(&:beat) } + end end def open_connections_statistics diff --git a/actioncable/lib/action_cable/subscription_adapter/async.rb b/actioncable/lib/action_cable/subscription_adapter/async.rb index cca6894289..10b3ac8cd8 100644 --- a/actioncable/lib/action_cable/subscription_adapter/async.rb +++ b/actioncable/lib/action_cable/subscription_adapter/async.rb @@ -5,16 +5,21 @@ module ActionCable class Async < Inline # :nodoc: private def new_subscriber_map - AsyncSubscriberMap.new + AsyncSubscriberMap.new(server.event_loop) end class AsyncSubscriberMap < SubscriberMap + def initialize(event_loop) + @event_loop = event_loop + super() + end + def add_subscriber(*) - Concurrent.global_io_executor.post { super } + @event_loop.post { super } end def invoke_callback(*) - Concurrent.global_io_executor.post { super } + @event_loop.post { super } end end end diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb index abaeb92e54..66c7852f6e 100644 --- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb +++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb @@ -42,14 +42,15 @@ module ActionCable private def listener - @listener || @server.mutex.synchronize { @listener ||= Listener.new(self) } + @listener || @server.mutex.synchronize { @listener ||= Listener.new(self, @server.event_loop) } end class Listener < SubscriberMap - def initialize(adapter) + def initialize(adapter, event_loop) super() @adapter = adapter + @event_loop = event_loop @queue = Queue.new @thread = Thread.new do @@ -68,7 +69,7 @@ module ActionCable case action when :listen pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}") - Concurrent.global_io_executor << callback if callback + @event_loop.post(&callback) if callback when :unlisten pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}") when :shutdown @@ -98,7 +99,7 @@ module ActionCable end def invoke_callback(*) - Concurrent.global_io_executor.post { super } + @event_loop.post { super } end end end diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb index 6b4236e7d3..65434f7107 100644 --- a/actioncable/lib/action_cable/subscription_adapter/redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb @@ -38,7 +38,7 @@ module ActionCable private def listener - @listener || @server.mutex.synchronize { @listener ||= Listener.new(self) } + @listener || @server.mutex.synchronize { @listener ||= Listener.new(self, @server.event_loop) } end def redis_connection_for_broadcasts @@ -52,10 +52,11 @@ module ActionCable end class Listener < SubscriberMap - def initialize(adapter) + def initialize(adapter, event_loop) super() @adapter = adapter + @event_loop = event_loop @subscribe_callbacks = Hash.new { |h, k| h[k] = [] } @subscription_lock = Mutex.new @@ -84,7 +85,7 @@ module ActionCable if callbacks = @subscribe_callbacks[chan] next_callback = callbacks.shift - Concurrent.global_io_executor << next_callback if next_callback + @event_loop.post(&next_callback) if next_callback @subscribe_callbacks.delete(chan) if callbacks.empty? end end @@ -133,7 +134,7 @@ module ActionCable end def invoke_callback(*) - Concurrent.global_io_executor.post { super } + @event_loop.post { super } end private diff --git a/actioncable/test/channel/periodic_timers_test.rb b/actioncable/test/channel/periodic_timers_test.rb index 64f0247cd6..e6f0c14c9d 100644 --- a/actioncable/test/channel/periodic_timers_test.rb +++ b/actioncable/test/channel/periodic_timers_test.rb @@ -31,7 +31,7 @@ class ActionCable::Channel::PeriodicTimersTest < ActiveSupport::TestCase end test "timer start and stop" do - Concurrent::TimerTask.expects(:new).times(2).returns(true) + @connection.server.event_loop.expects(:timer).times(2).returns(true) channel = ChatChannel.new @connection, "{id: 1}", { id: 1 } channel.expects(:stop_periodic_timers).once diff --git a/actioncable/test/client_test.rb b/actioncable/test/client_test.rb index a6619d3bd2..30620c792b 100644 --- a/actioncable/test/client_test.rb +++ b/actioncable/test/client_test.rb @@ -8,8 +8,8 @@ require 'faye/websocket' require 'json' class ClientTest < ActionCable::TestCase - WAIT_WHEN_EXPECTING_EVENT = 3 - WAIT_WHEN_NOT_EXPECTING_EVENT = 0.2 + WAIT_WHEN_EXPECTING_EVENT = 8 + WAIT_WHEN_NOT_EXPECTING_EVENT = 0.5 def setup ActionCable.instance_variable_set(:@server, nil) @@ -17,6 +17,7 @@ class ClientTest < ActionCable::TestCase server.config.logger = Logger.new(StringIO.new).tap { |l| l.level = Logger::UNKNOWN } server.config.cable = { adapter: 'async' }.with_indifferent_access + server.config.use_faye = ENV['FAYE'].present? # and now the "real" setup for our test: server.config.disable_request_forgery_protection = true diff --git a/actioncable/test/connection/authorization_test.rb b/actioncable/test/connection/authorization_test.rb index 87d0e79ef3..a0506cb9c0 100644 --- a/actioncable/test/connection/authorization_test.rb +++ b/actioncable/test/connection/authorization_test.rb @@ -20,7 +20,7 @@ class ActionCable::Connection::AuthorizationTest < ActionCable::TestCase server.config.allowed_request_origins = %w( http://rubyonrails.com ) env = Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket', - 'HTTP_ORIGIN' => 'http://rubyonrails.com' + 'HTTP_HOST' => 'localhost', 'HTTP_ORIGIN' => 'http://rubyonrails.com' connection = Connection.new(server, env) connection.websocket.expects(:close) diff --git a/actioncable/test/connection/base_test.rb b/actioncable/test/connection/base_test.rb index fb11f9be64..8a25d2a378 100644 --- a/actioncable/test/connection/base_test.rb +++ b/actioncable/test/connection/base_test.rb @@ -1,5 +1,6 @@ require 'test_helper' require 'stubs/test_server' +require 'active_support/core_ext/object/json' class ActionCable::Connection::BaseTest < ActionCable::TestCase class Connection < ActionCable::Connection::Base @@ -73,7 +74,7 @@ class ActionCable::Connection::BaseTest < ActionCable::TestCase connection.process # Setup the connection - Concurrent::TimerTask.stubs(:new).returns(true) + connection.server.stubs(:timer).returns(true) connection.send :handle_open assert connection.connected @@ -119,7 +120,7 @@ class ActionCable::Connection::BaseTest < ActionCable::TestCase env = Rack::MockRequest.env_for( "/test", { 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket', - 'HTTP_ORIGIN' => 'http://rubyonrails.org', 'rack.hijack' => CallMeMaybe.new } + 'HTTP_HOST' => 'localhost', 'HTTP_ORIGIN' => 'http://rubyonrails.org', 'rack.hijack' => CallMeMaybe.new } ) connection = ActionCable::Connection::Base.new(@server, env) @@ -131,7 +132,7 @@ class ActionCable::Connection::BaseTest < ActionCable::TestCase private def open_connection env = Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket', - 'HTTP_ORIGIN' => 'http://rubyonrails.com' + 'HTTP_HOST' => 'localhost', 'HTTP_ORIGIN' => 'http://rubyonrails.com' Connection.new(@server, env) end diff --git a/actioncable/test/connection/cross_site_forgery_test.rb b/actioncable/test/connection/cross_site_forgery_test.rb index a29f65fb97..2d516b0533 100644 --- a/actioncable/test/connection/cross_site_forgery_test.rb +++ b/actioncable/test/connection/cross_site_forgery_test.rb @@ -76,6 +76,6 @@ class ActionCable::Connection::CrossSiteForgeryTest < ActionCable::TestCase def env_for_origin(origin) Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket', 'SERVER_NAME' => HOST, - 'HTTP_ORIGIN' => origin + 'HTTP_HOST' => HOST, 'HTTP_ORIGIN' => origin end end diff --git a/actioncable/test/connection/identifier_test.rb b/actioncable/test/connection/identifier_test.rb index 1019ad541e..c3d5f1f90b 100644 --- a/actioncable/test/connection/identifier_test.rb +++ b/actioncable/test/connection/identifier_test.rb @@ -64,7 +64,7 @@ class ActionCable::Connection::IdentifierTest < ActionCable::TestCase end def open_connection(server:) - env = Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket' + env = Rack::MockRequest.env_for "/test", 'HTTP_HOST' => 'localhost', 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket' @connection = Connection.new(server, env) @connection.process diff --git a/actioncable/test/connection/multiple_identifiers_test.rb b/actioncable/test/connection/multiple_identifiers_test.rb index e9bb4e6d7f..484e73bb30 100644 --- a/actioncable/test/connection/multiple_identifiers_test.rb +++ b/actioncable/test/connection/multiple_identifiers_test.rb @@ -28,7 +28,7 @@ class ActionCable::Connection::MultipleIdentifiersTest < ActionCable::TestCase end def open_connection(server:) - env = Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket' + env = Rack::MockRequest.env_for "/test", 'HTTP_HOST' => 'localhost', 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket' @connection = Connection.new(server, env) @connection.process diff --git a/actioncable/test/connection/string_identifier_test.rb b/actioncable/test/connection/string_identifier_test.rb index 9d0bda83ef..eca0c31060 100644 --- a/actioncable/test/connection/string_identifier_test.rb +++ b/actioncable/test/connection/string_identifier_test.rb @@ -30,7 +30,7 @@ class ActionCable::Connection::StringIdentifierTest < ActionCable::TestCase end def open_connection - env = Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket' + env = Rack::MockRequest.env_for "/test", 'HTTP_HOST' => 'localhost', 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket' @connection = Connection.new(@server, env) @connection.process diff --git a/actioncable/test/connection/subscriptions_test.rb b/actioncable/test/connection/subscriptions_test.rb index 62e41484fe..30819d64af 100644 --- a/actioncable/test/connection/subscriptions_test.rb +++ b/actioncable/test/connection/subscriptions_test.rb @@ -107,7 +107,7 @@ class ActionCable::Connection::SubscriptionsTest < ActionCable::TestCase end def setup_connection - env = Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket' + env = Rack::MockRequest.env_for "/test", 'HTTP_HOST' => 'localhost', 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket' @connection = Connection.new(@server, env) @subscriptions = ActionCable::Connection::Subscriptions.new(@connection) diff --git a/actioncable/test/stubs/test_connection.rb b/actioncable/test/stubs/test_connection.rb index da98201900..8ba284fdc6 100644 --- a/actioncable/test/stubs/test_connection.rb +++ b/actioncable/test/stubs/test_connection.rb @@ -1,18 +1,19 @@ require 'stubs/user' class TestConnection - attr_reader :identifiers, :logger, :current_user, :transmissions + attr_reader :identifiers, :logger, :current_user, :server, :transmissions def initialize(user = User.new("lifo")) @identifiers = [ :current_user ] @current_user = user @logger = ActiveSupport::TaggedLogging.new ActiveSupport::Logger.new(StringIO.new) + @server = TestServer.new @transmissions = [] end def pubsub - SuccessAdapter.new(TestServer.new) + SuccessAdapter.new(server) end def transmit(data) diff --git a/actioncable/test/stubs/test_server.rb b/actioncable/test/stubs/test_server.rb index 5916cf1e83..9e860825f3 100644 --- a/actioncable/test/stubs/test_server.rb +++ b/actioncable/test/stubs/test_server.rb @@ -8,14 +8,24 @@ class TestServer def initialize @logger = ActiveSupport::TaggedLogging.new ActiveSupport::Logger.new(StringIO.new) @config = OpenStruct.new(log_tags: [], subscription_adapter: SuccessAdapter) + @config.use_faye = ENV['FAYE'].present? + @config.client_socket_class = if @config.use_faye + ActionCable::Connection::FayeClientSocket + else + ActionCable::Connection::ClientSocket + end end def pubsub @config.subscription_adapter.new(self) end - def stream_event_loop - @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new + def event_loop + @event_loop ||= if @config.use_faye + ActionCable::Connection::FayeEventLoop.new + else + ActionCable::Connection::StreamEventLoop.new + end end def worker_pool diff --git a/actioncable/test/subscription_adapter/common.rb b/actioncable/test/subscription_adapter/common.rb index b31c2aa36c..82f0abbbf3 100644 --- a/actioncable/test/subscription_adapter/common.rb +++ b/actioncable/test/subscription_adapter/common.rb @@ -11,6 +11,7 @@ module CommonSubscriptionAdapterTest def setup server = ActionCable::Server::Base.new server.config.cable = cable_config.with_indifferent_access + server.config.use_faye = ENV['FAYE'].present? adapter_klass = server.config.pubsub_adapter diff --git a/actioncable/test/test_helper.rb b/actioncable/test/test_helper.rb index 8ddbd4e764..1a95bef32c 100644 --- a/actioncable/test/test_helper.rb +++ b/actioncable/test/test_helper.rb @@ -13,7 +13,41 @@ require 'rack/mock' # Require all the stubs and models Dir[File.dirname(__FILE__) + '/stubs/*.rb'].each {|file| require file } -class ActionCable::TestCase < ActiveSupport::TestCase +if ENV['FAYE'].present? + require 'faye/websocket' + class << Faye::WebSocket + remove_method :ensure_reactor_running + + # We don't want Faye to start the EM reactor in tests because it makes testing much harder. + # We want to be able to start and stop EM loop in tests to make things simpler. + def ensure_reactor_running + # no-op + end + end +end + +module EventMachineConcurrencyHelpers + def wait_for_async + EM.run_deferred_callbacks + end + + def run_in_eventmachine + failure = nil + EM.run do + begin + yield + rescue => ex + failure = ex + ensure + wait_for_async + EM.stop if EM.reactor_running? + end + end + raise failure if failure + end +end + +module ConcurrentRubyConcurrencyHelpers def wait_for_async e = Concurrent.global_io_executor until e.completed_task_count == e.scheduled_task_count @@ -26,3 +60,11 @@ class ActionCable::TestCase < ActiveSupport::TestCase wait_for_async end end + +class ActionCable::TestCase < ActiveSupport::TestCase + if ENV['FAYE'].present? + include EventMachineConcurrencyHelpers + else + include ConcurrentRubyConcurrencyHelpers + end +end |