aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable')
-rw-r--r--actioncable/lib/action_cable/channel/periodic_timers.rb2
-rw-r--r--actioncable/lib/action_cable/channel/streams.rb2
-rw-r--r--actioncable/lib/action_cable/connection.rb2
-rw-r--r--actioncable/lib/action_cable/connection/base.rb4
-rw-r--r--actioncable/lib/action_cable/connection/client_socket.rb10
-rw-r--r--actioncable/lib/action_cable/connection/faye_client_socket.rb42
-rw-r--r--actioncable/lib/action_cable/connection/faye_event_loop.rb44
-rw-r--r--actioncable/lib/action_cable/connection/internal_channel.rb4
-rw-r--r--actioncable/lib/action_cable/connection/stream_event_loop.rb11
-rw-r--r--actioncable/lib/action_cable/connection/web_socket.rb4
-rw-r--r--actioncable/lib/action_cable/server/base.rb10
-rw-r--r--actioncable/lib/action_cable/server/configuration.rb18
-rw-r--r--actioncable/lib/action_cable/server/connections.rb6
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/async.rb11
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/postgresql.rb9
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/redis.rb9
-rw-r--r--actioncable/test/channel/periodic_timers_test.rb2
-rw-r--r--actioncable/test/client_test.rb5
-rw-r--r--actioncable/test/connection/authorization_test.rb2
-rw-r--r--actioncable/test/connection/base_test.rb7
-rw-r--r--actioncable/test/connection/cross_site_forgery_test.rb2
-rw-r--r--actioncable/test/connection/identifier_test.rb2
-rw-r--r--actioncable/test/connection/multiple_identifiers_test.rb2
-rw-r--r--actioncable/test/connection/string_identifier_test.rb2
-rw-r--r--actioncable/test/connection/subscriptions_test.rb2
-rw-r--r--actioncable/test/stubs/test_connection.rb5
-rw-r--r--actioncable/test/stubs/test_server.rb14
-rw-r--r--actioncable/test/subscription_adapter/common.rb1
-rw-r--r--actioncable/test/test_helper.rb44
29 files changed, 227 insertions, 51 deletions
diff --git a/actioncable/lib/action_cable/channel/periodic_timers.rb b/actioncable/lib/action_cable/channel/periodic_timers.rb
index 0f6e854520..b414255707 100644
--- a/actioncable/lib/action_cable/channel/periodic_timers.rb
+++ b/actioncable/lib/action_cable/channel/periodic_timers.rb
@@ -27,7 +27,7 @@ module ActionCable
def start_periodic_timers
self.class.periodic_timers.each do |callback, options|
- active_periodic_timers << Concurrent::TimerTask.new(execution_interval: options[:every]) do
+ active_periodic_timers << connection.server.event_loop.timer(options[:every]) do
connection.worker_pool.async_run_periodic_timer(self, callback)
end
end
diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb
index 431a5c1063..23d7320a28 100644
--- a/actioncable/lib/action_cable/channel/streams.rb
+++ b/actioncable/lib/action_cable/channel/streams.rb
@@ -79,7 +79,7 @@ module ActionCable
callback ||= default_stream_callback(broadcasting)
streams << [ broadcasting, callback ]
- Concurrent.global_io_executor.post do
+ connection.server.event_loop.post do
pubsub.subscribe(broadcasting, callback, lambda do
transmit_subscription_confirmation
logger.info "#{self.class.name} is streaming from #{broadcasting}"
diff --git a/actioncable/lib/action_cable/connection.rb b/actioncable/lib/action_cable/connection.rb
index 902efb07e2..5f813cf8e0 100644
--- a/actioncable/lib/action_cable/connection.rb
+++ b/actioncable/lib/action_cable/connection.rb
@@ -8,6 +8,8 @@ module ActionCable
autoload :ClientSocket
autoload :Identification
autoload :InternalChannel
+ autoload :FayeClientSocket
+ autoload :FayeEventLoop
autoload :MessageBuffer
autoload :Stream
autoload :StreamEventLoop
diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb
index afe0d958d7..7e7e777eaa 100644
--- a/actioncable/lib/action_cable/connection/base.rb
+++ b/actioncable/lib/action_cable/connection/base.rb
@@ -49,7 +49,7 @@ module ActionCable
include Authorization
attr_reader :server, :env, :subscriptions, :logger, :worker_pool
- delegate :stream_event_loop, :pubsub, to: :server
+ delegate :event_loop, :pubsub, to: :server
def initialize(server, env)
@server, @env = server, env
@@ -57,7 +57,7 @@ module ActionCable
@worker_pool = server.worker_pool
@logger = new_tagged_logger
- @websocket = ActionCable::Connection::WebSocket.new(env, self, stream_event_loop)
+ @websocket = ActionCable::Connection::WebSocket.new(env, self, event_loop, server.config.client_socket_class)
@subscriptions = ActionCable::Connection::Subscriptions.new(self)
@message_buffer = ActionCable::Connection::MessageBuffer.new(self)
diff --git a/actioncable/lib/action_cable/connection/client_socket.rb b/actioncable/lib/action_cable/connection/client_socket.rb
index f6b11e93f0..9e4dbcd6e6 100644
--- a/actioncable/lib/action_cable/connection/client_socket.rb
+++ b/actioncable/lib/action_cable/connection/client_socket.rb
@@ -29,10 +29,10 @@ module ActionCable
attr_reader :env, :url
- def initialize(env, event_target, stream_event_loop)
- @env = env
- @event_target = event_target
- @stream_event_loop = stream_event_loop
+ def initialize(env, event_target, event_loop)
+ @env = env
+ @event_target = event_target
+ @event_loop = event_loop
@url = ClientSocket.determine_url(@env)
@@ -49,7 +49,7 @@ module ActionCable
@driver.on(:close) { |e| begin_close(e.reason, e.code) }
@driver.on(:error) { |e| emit_error(e.message) }
- @stream = ActionCable::Connection::Stream.new(@stream_event_loop, self)
+ @stream = ActionCable::Connection::Stream.new(@event_loop, self)
end
def start_driver
diff --git a/actioncable/lib/action_cable/connection/faye_client_socket.rb b/actioncable/lib/action_cable/connection/faye_client_socket.rb
new file mode 100644
index 0000000000..c9139b6858
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/faye_client_socket.rb
@@ -0,0 +1,42 @@
+require 'faye/websocket'
+
+module ActionCable
+ module Connection
+ class FayeClientSocket
+ def initialize(env, event_target, stream_event_loop)
+ @env = env
+ @event_target = event_target
+
+ @faye = nil
+ end
+
+ def alive?
+ @faye && @faye.ready_state == Faye::WebSocket::API::OPEN
+ end
+
+ def transmit(data)
+ connect
+ @faye.send data
+ end
+
+ def close
+ @faye && @faye.close
+ end
+
+ def rack_response
+ connect
+ @faye.rack_response
+ end
+
+ private
+ def connect
+ return if @faye
+ @faye = Faye::WebSocket.new(@env)
+
+ @faye.on(:open) { |event| @event_target.on_open }
+ @faye.on(:message) { |event| @event_target.on_message(event.data) }
+ @faye.on(:close) { |event| @event_target.on_close(event.reason, event.code) }
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/connection/faye_event_loop.rb b/actioncable/lib/action_cable/connection/faye_event_loop.rb
new file mode 100644
index 0000000000..8b70f3d84e
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/faye_event_loop.rb
@@ -0,0 +1,44 @@
+require 'thread'
+
+require 'eventmachine'
+EventMachine.epoll if EventMachine.epoll?
+EventMachine.kqueue if EventMachine.kqueue?
+
+module ActionCable
+ module Connection
+ class FayeEventLoop
+ @@mutex = Mutex.new
+
+ def timer(interval, &block)
+ ensure_reactor_running
+ EMTimer.new(::EM::PeriodicTimer.new(interval, &block))
+ end
+
+ def post(task = nil, &block)
+ task ||= block
+
+ ensure_reactor_running
+ ::EM.next_tick(&task)
+ end
+
+ private
+ def ensure_reactor_running
+ return if EventMachine.reactor_running?
+ @@mutex.synchronize do
+ Thread.new { EventMachine.run } unless EventMachine.reactor_running?
+ Thread.pass until EventMachine.reactor_running?
+ end
+ end
+
+ class EMTimer
+ def initialize(inner)
+ @inner = inner
+ end
+
+ def shutdown
+ inner.cancel
+ end
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/connection/internal_channel.rb b/actioncable/lib/action_cable/connection/internal_channel.rb
index 27826792b3..3c5d39f59a 100644
--- a/actioncable/lib/action_cable/connection/internal_channel.rb
+++ b/actioncable/lib/action_cable/connection/internal_channel.rb
@@ -15,14 +15,14 @@ module ActionCable
@_internal_subscriptions ||= []
@_internal_subscriptions << [ internal_channel, callback ]
- Concurrent.global_io_executor.post { pubsub.subscribe(internal_channel, callback) }
+ server.event_loop.post { pubsub.subscribe(internal_channel, callback) }
logger.info "Registered connection (#{connection_identifier})"
end
end
def unsubscribe_from_internal_channel
if @_internal_subscriptions.present?
- @_internal_subscriptions.each { |channel, callback| Concurrent.global_io_executor.post { pubsub.unsubscribe(channel, callback) } }
+ @_internal_subscriptions.each { |channel, callback| server.event_loop.post { pubsub.unsubscribe(channel, callback) } }
end
end
diff --git a/actioncable/lib/action_cable/connection/stream_event_loop.rb b/actioncable/lib/action_cable/connection/stream_event_loop.rb
index e6335082d2..2abad09c03 100644
--- a/actioncable/lib/action_cable/connection/stream_event_loop.rb
+++ b/actioncable/lib/action_cable/connection/stream_event_loop.rb
@@ -11,7 +11,16 @@ module ActionCable
@todo = Queue.new
@spawn_mutex = Mutex.new
- spawn
+ end
+
+ def timer(interval, &block)
+ Concurrent::TimerTask.new(execution_interval: interval, &block).tap(&:execute)
+ end
+
+ def post(task = nil, &block)
+ task ||= block
+
+ Concurrent.global_io_executor << task
end
def attach(io, stream)
diff --git a/actioncable/lib/action_cable/connection/web_socket.rb b/actioncable/lib/action_cable/connection/web_socket.rb
index 5e89fb9b72..0bec9b6a96 100644
--- a/actioncable/lib/action_cable/connection/web_socket.rb
+++ b/actioncable/lib/action_cable/connection/web_socket.rb
@@ -4,8 +4,8 @@ module ActionCable
module Connection
# Wrap the real socket to minimize the externally-presented API
class WebSocket
- def initialize(env, event_target, stream_event_loop)
- @websocket = ::WebSocket::Driver.websocket?(env) ? ClientSocket.new(env, event_target, stream_event_loop) : nil
+ def initialize(env, event_target, event_loop, client_socket_class)
+ @websocket = ::WebSocket::Driver.websocket?(env) ? client_socket_class.new(env, event_target, event_loop) : nil
end
def possible?
diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb
index d9a2653cc2..778f5ffeed 100644
--- a/actioncable/lib/action_cable/server/base.rb
+++ b/actioncable/lib/action_cable/server/base.rb
@@ -1,4 +1,4 @@
-require 'thread'
+require 'monitor'
module ActionCable
module Server
@@ -18,8 +18,8 @@ module ActionCable
attr_reader :mutex
def initialize
- @mutex = Mutex.new
- @remote_connections = @stream_event_loop = @worker_pool = @channel_classes = @pubsub = nil
+ @mutex = Monitor.new
+ @remote_connections = @event_loop = @worker_pool = @channel_classes = @pubsub = nil
end
# Called by Rack to setup the server.
@@ -48,8 +48,8 @@ module ActionCable
@remote_connections || @mutex.synchronize { @remote_connections ||= RemoteConnections.new(self) }
end
- def stream_event_loop
- @stream_event_loop || @mutex.synchronize { @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new }
+ def event_loop
+ @event_loop || @mutex.synchronize { @event_loop ||= config.event_loop_class.new }
end
# The thread worker pool for handling all the connection work on this server. Default size is set by config.worker_pool_size.
diff --git a/actioncable/lib/action_cable/server/configuration.rb b/actioncable/lib/action_cable/server/configuration.rb
index 9a7301287c..5fe71caed2 100644
--- a/actioncable/lib/action_cable/server/configuration.rb
+++ b/actioncable/lib/action_cable/server/configuration.rb
@@ -4,7 +4,7 @@ module ActionCable
# in a Rails config initializer.
class Configuration
attr_accessor :logger, :log_tags
- attr_accessor :connection_class, :worker_pool_size
+ attr_accessor :use_faye, :connection_class, :worker_pool_size
attr_accessor :disable_request_forgery_protection, :allowed_request_origins
attr_accessor :cable, :url, :mount_path
@@ -43,6 +43,22 @@ module ActionCable
adapter = 'PostgreSQL' if adapter == 'Postgresql'
"ActionCable::SubscriptionAdapter::#{adapter}".constantize
end
+
+ def event_loop_class
+ if use_faye
+ ActionCable::Connection::FayeEventLoop
+ else
+ ActionCable::Connection::StreamEventLoop
+ end
+ end
+
+ def client_socket_class
+ if use_faye
+ ActionCable::Connection::FayeClientSocket
+ else
+ ActionCable::Connection::ClientSocket
+ end
+ end
end
end
end
diff --git a/actioncable/lib/action_cable/server/connections.rb b/actioncable/lib/action_cable/server/connections.rb
index 4dc8934b25..5e61b4e335 100644
--- a/actioncable/lib/action_cable/server/connections.rb
+++ b/actioncable/lib/action_cable/server/connections.rb
@@ -21,9 +21,9 @@ module ActionCable
# then can't rely on being able to communicate with the connection. To solve this, a 3 second heartbeat runs on all connections. If the beat fails, we automatically
# disconnect.
def setup_heartbeat_timer
- @heartbeat_timer ||= Concurrent::TimerTask.new(execution_interval: BEAT_INTERVAL) do
- Concurrent.global_io_executor.post { connections.map(&:beat) }
- end.tap(&:execute)
+ @heartbeat_timer ||= event_loop.timer(BEAT_INTERVAL) do
+ event_loop.post { connections.map(&:beat) }
+ end
end
def open_connections_statistics
diff --git a/actioncable/lib/action_cable/subscription_adapter/async.rb b/actioncable/lib/action_cable/subscription_adapter/async.rb
index cca6894289..10b3ac8cd8 100644
--- a/actioncable/lib/action_cable/subscription_adapter/async.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/async.rb
@@ -5,16 +5,21 @@ module ActionCable
class Async < Inline # :nodoc:
private
def new_subscriber_map
- AsyncSubscriberMap.new
+ AsyncSubscriberMap.new(server.event_loop)
end
class AsyncSubscriberMap < SubscriberMap
+ def initialize(event_loop)
+ @event_loop = event_loop
+ super()
+ end
+
def add_subscriber(*)
- Concurrent.global_io_executor.post { super }
+ @event_loop.post { super }
end
def invoke_callback(*)
- Concurrent.global_io_executor.post { super }
+ @event_loop.post { super }
end
end
end
diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
index abaeb92e54..66c7852f6e 100644
--- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
@@ -42,14 +42,15 @@ module ActionCable
private
def listener
- @listener || @server.mutex.synchronize { @listener ||= Listener.new(self) }
+ @listener || @server.mutex.synchronize { @listener ||= Listener.new(self, @server.event_loop) }
end
class Listener < SubscriberMap
- def initialize(adapter)
+ def initialize(adapter, event_loop)
super()
@adapter = adapter
+ @event_loop = event_loop
@queue = Queue.new
@thread = Thread.new do
@@ -68,7 +69,7 @@ module ActionCable
case action
when :listen
pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}")
- Concurrent.global_io_executor << callback if callback
+ @event_loop.post(&callback) if callback
when :unlisten
pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}")
when :shutdown
@@ -98,7 +99,7 @@ module ActionCable
end
def invoke_callback(*)
- Concurrent.global_io_executor.post { super }
+ @event_loop.post { super }
end
end
end
diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb
index 6b4236e7d3..65434f7107 100644
--- a/actioncable/lib/action_cable/subscription_adapter/redis.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb
@@ -38,7 +38,7 @@ module ActionCable
private
def listener
- @listener || @server.mutex.synchronize { @listener ||= Listener.new(self) }
+ @listener || @server.mutex.synchronize { @listener ||= Listener.new(self, @server.event_loop) }
end
def redis_connection_for_broadcasts
@@ -52,10 +52,11 @@ module ActionCable
end
class Listener < SubscriberMap
- def initialize(adapter)
+ def initialize(adapter, event_loop)
super()
@adapter = adapter
+ @event_loop = event_loop
@subscribe_callbacks = Hash.new { |h, k| h[k] = [] }
@subscription_lock = Mutex.new
@@ -84,7 +85,7 @@ module ActionCable
if callbacks = @subscribe_callbacks[chan]
next_callback = callbacks.shift
- Concurrent.global_io_executor << next_callback if next_callback
+ @event_loop.post(&next_callback) if next_callback
@subscribe_callbacks.delete(chan) if callbacks.empty?
end
end
@@ -133,7 +134,7 @@ module ActionCable
end
def invoke_callback(*)
- Concurrent.global_io_executor.post { super }
+ @event_loop.post { super }
end
private
diff --git a/actioncable/test/channel/periodic_timers_test.rb b/actioncable/test/channel/periodic_timers_test.rb
index 64f0247cd6..e6f0c14c9d 100644
--- a/actioncable/test/channel/periodic_timers_test.rb
+++ b/actioncable/test/channel/periodic_timers_test.rb
@@ -31,7 +31,7 @@ class ActionCable::Channel::PeriodicTimersTest < ActiveSupport::TestCase
end
test "timer start and stop" do
- Concurrent::TimerTask.expects(:new).times(2).returns(true)
+ @connection.server.event_loop.expects(:timer).times(2).returns(true)
channel = ChatChannel.new @connection, "{id: 1}", { id: 1 }
channel.expects(:stop_periodic_timers).once
diff --git a/actioncable/test/client_test.rb b/actioncable/test/client_test.rb
index a6619d3bd2..30620c792b 100644
--- a/actioncable/test/client_test.rb
+++ b/actioncable/test/client_test.rb
@@ -8,8 +8,8 @@ require 'faye/websocket'
require 'json'
class ClientTest < ActionCable::TestCase
- WAIT_WHEN_EXPECTING_EVENT = 3
- WAIT_WHEN_NOT_EXPECTING_EVENT = 0.2
+ WAIT_WHEN_EXPECTING_EVENT = 8
+ WAIT_WHEN_NOT_EXPECTING_EVENT = 0.5
def setup
ActionCable.instance_variable_set(:@server, nil)
@@ -17,6 +17,7 @@ class ClientTest < ActionCable::TestCase
server.config.logger = Logger.new(StringIO.new).tap { |l| l.level = Logger::UNKNOWN }
server.config.cable = { adapter: 'async' }.with_indifferent_access
+ server.config.use_faye = ENV['FAYE'].present?
# and now the "real" setup for our test:
server.config.disable_request_forgery_protection = true
diff --git a/actioncable/test/connection/authorization_test.rb b/actioncable/test/connection/authorization_test.rb
index 87d0e79ef3..a0506cb9c0 100644
--- a/actioncable/test/connection/authorization_test.rb
+++ b/actioncable/test/connection/authorization_test.rb
@@ -20,7 +20,7 @@ class ActionCable::Connection::AuthorizationTest < ActionCable::TestCase
server.config.allowed_request_origins = %w( http://rubyonrails.com )
env = Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket',
- 'HTTP_ORIGIN' => 'http://rubyonrails.com'
+ 'HTTP_HOST' => 'localhost', 'HTTP_ORIGIN' => 'http://rubyonrails.com'
connection = Connection.new(server, env)
connection.websocket.expects(:close)
diff --git a/actioncable/test/connection/base_test.rb b/actioncable/test/connection/base_test.rb
index fb11f9be64..8a25d2a378 100644
--- a/actioncable/test/connection/base_test.rb
+++ b/actioncable/test/connection/base_test.rb
@@ -1,5 +1,6 @@
require 'test_helper'
require 'stubs/test_server'
+require 'active_support/core_ext/object/json'
class ActionCable::Connection::BaseTest < ActionCable::TestCase
class Connection < ActionCable::Connection::Base
@@ -73,7 +74,7 @@ class ActionCable::Connection::BaseTest < ActionCable::TestCase
connection.process
# Setup the connection
- Concurrent::TimerTask.stubs(:new).returns(true)
+ connection.server.stubs(:timer).returns(true)
connection.send :handle_open
assert connection.connected
@@ -119,7 +120,7 @@ class ActionCable::Connection::BaseTest < ActionCable::TestCase
env = Rack::MockRequest.env_for(
"/test",
{ 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket',
- 'HTTP_ORIGIN' => 'http://rubyonrails.org', 'rack.hijack' => CallMeMaybe.new }
+ 'HTTP_HOST' => 'localhost', 'HTTP_ORIGIN' => 'http://rubyonrails.org', 'rack.hijack' => CallMeMaybe.new }
)
connection = ActionCable::Connection::Base.new(@server, env)
@@ -131,7 +132,7 @@ class ActionCable::Connection::BaseTest < ActionCable::TestCase
private
def open_connection
env = Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket',
- 'HTTP_ORIGIN' => 'http://rubyonrails.com'
+ 'HTTP_HOST' => 'localhost', 'HTTP_ORIGIN' => 'http://rubyonrails.com'
Connection.new(@server, env)
end
diff --git a/actioncable/test/connection/cross_site_forgery_test.rb b/actioncable/test/connection/cross_site_forgery_test.rb
index a29f65fb97..2d516b0533 100644
--- a/actioncable/test/connection/cross_site_forgery_test.rb
+++ b/actioncable/test/connection/cross_site_forgery_test.rb
@@ -76,6 +76,6 @@ class ActionCable::Connection::CrossSiteForgeryTest < ActionCable::TestCase
def env_for_origin(origin)
Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket', 'SERVER_NAME' => HOST,
- 'HTTP_ORIGIN' => origin
+ 'HTTP_HOST' => HOST, 'HTTP_ORIGIN' => origin
end
end
diff --git a/actioncable/test/connection/identifier_test.rb b/actioncable/test/connection/identifier_test.rb
index 1019ad541e..c3d5f1f90b 100644
--- a/actioncable/test/connection/identifier_test.rb
+++ b/actioncable/test/connection/identifier_test.rb
@@ -64,7 +64,7 @@ class ActionCable::Connection::IdentifierTest < ActionCable::TestCase
end
def open_connection(server:)
- env = Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket'
+ env = Rack::MockRequest.env_for "/test", 'HTTP_HOST' => 'localhost', 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket'
@connection = Connection.new(server, env)
@connection.process
diff --git a/actioncable/test/connection/multiple_identifiers_test.rb b/actioncable/test/connection/multiple_identifiers_test.rb
index e9bb4e6d7f..484e73bb30 100644
--- a/actioncable/test/connection/multiple_identifiers_test.rb
+++ b/actioncable/test/connection/multiple_identifiers_test.rb
@@ -28,7 +28,7 @@ class ActionCable::Connection::MultipleIdentifiersTest < ActionCable::TestCase
end
def open_connection(server:)
- env = Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket'
+ env = Rack::MockRequest.env_for "/test", 'HTTP_HOST' => 'localhost', 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket'
@connection = Connection.new(server, env)
@connection.process
diff --git a/actioncable/test/connection/string_identifier_test.rb b/actioncable/test/connection/string_identifier_test.rb
index 9d0bda83ef..eca0c31060 100644
--- a/actioncable/test/connection/string_identifier_test.rb
+++ b/actioncable/test/connection/string_identifier_test.rb
@@ -30,7 +30,7 @@ class ActionCable::Connection::StringIdentifierTest < ActionCable::TestCase
end
def open_connection
- env = Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket'
+ env = Rack::MockRequest.env_for "/test", 'HTTP_HOST' => 'localhost', 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket'
@connection = Connection.new(@server, env)
@connection.process
diff --git a/actioncable/test/connection/subscriptions_test.rb b/actioncable/test/connection/subscriptions_test.rb
index 62e41484fe..30819d64af 100644
--- a/actioncable/test/connection/subscriptions_test.rb
+++ b/actioncable/test/connection/subscriptions_test.rb
@@ -107,7 +107,7 @@ class ActionCable::Connection::SubscriptionsTest < ActionCable::TestCase
end
def setup_connection
- env = Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket'
+ env = Rack::MockRequest.env_for "/test", 'HTTP_HOST' => 'localhost', 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket'
@connection = Connection.new(@server, env)
@subscriptions = ActionCable::Connection::Subscriptions.new(@connection)
diff --git a/actioncable/test/stubs/test_connection.rb b/actioncable/test/stubs/test_connection.rb
index da98201900..8ba284fdc6 100644
--- a/actioncable/test/stubs/test_connection.rb
+++ b/actioncable/test/stubs/test_connection.rb
@@ -1,18 +1,19 @@
require 'stubs/user'
class TestConnection
- attr_reader :identifiers, :logger, :current_user, :transmissions
+ attr_reader :identifiers, :logger, :current_user, :server, :transmissions
def initialize(user = User.new("lifo"))
@identifiers = [ :current_user ]
@current_user = user
@logger = ActiveSupport::TaggedLogging.new ActiveSupport::Logger.new(StringIO.new)
+ @server = TestServer.new
@transmissions = []
end
def pubsub
- SuccessAdapter.new(TestServer.new)
+ SuccessAdapter.new(server)
end
def transmit(data)
diff --git a/actioncable/test/stubs/test_server.rb b/actioncable/test/stubs/test_server.rb
index 5916cf1e83..9e860825f3 100644
--- a/actioncable/test/stubs/test_server.rb
+++ b/actioncable/test/stubs/test_server.rb
@@ -8,14 +8,24 @@ class TestServer
def initialize
@logger = ActiveSupport::TaggedLogging.new ActiveSupport::Logger.new(StringIO.new)
@config = OpenStruct.new(log_tags: [], subscription_adapter: SuccessAdapter)
+ @config.use_faye = ENV['FAYE'].present?
+ @config.client_socket_class = if @config.use_faye
+ ActionCable::Connection::FayeClientSocket
+ else
+ ActionCable::Connection::ClientSocket
+ end
end
def pubsub
@config.subscription_adapter.new(self)
end
- def stream_event_loop
- @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new
+ def event_loop
+ @event_loop ||= if @config.use_faye
+ ActionCable::Connection::FayeEventLoop.new
+ else
+ ActionCable::Connection::StreamEventLoop.new
+ end
end
def worker_pool
diff --git a/actioncable/test/subscription_adapter/common.rb b/actioncable/test/subscription_adapter/common.rb
index b31c2aa36c..82f0abbbf3 100644
--- a/actioncable/test/subscription_adapter/common.rb
+++ b/actioncable/test/subscription_adapter/common.rb
@@ -11,6 +11,7 @@ module CommonSubscriptionAdapterTest
def setup
server = ActionCable::Server::Base.new
server.config.cable = cable_config.with_indifferent_access
+ server.config.use_faye = ENV['FAYE'].present?
adapter_klass = server.config.pubsub_adapter
diff --git a/actioncable/test/test_helper.rb b/actioncable/test/test_helper.rb
index 8ddbd4e764..1a95bef32c 100644
--- a/actioncable/test/test_helper.rb
+++ b/actioncable/test/test_helper.rb
@@ -13,7 +13,41 @@ require 'rack/mock'
# Require all the stubs and models
Dir[File.dirname(__FILE__) + '/stubs/*.rb'].each {|file| require file }
-class ActionCable::TestCase < ActiveSupport::TestCase
+if ENV['FAYE'].present?
+ require 'faye/websocket'
+ class << Faye::WebSocket
+ remove_method :ensure_reactor_running
+
+ # We don't want Faye to start the EM reactor in tests because it makes testing much harder.
+ # We want to be able to start and stop EM loop in tests to make things simpler.
+ def ensure_reactor_running
+ # no-op
+ end
+ end
+end
+
+module EventMachineConcurrencyHelpers
+ def wait_for_async
+ EM.run_deferred_callbacks
+ end
+
+ def run_in_eventmachine
+ failure = nil
+ EM.run do
+ begin
+ yield
+ rescue => ex
+ failure = ex
+ ensure
+ wait_for_async
+ EM.stop if EM.reactor_running?
+ end
+ end
+ raise failure if failure
+ end
+end
+
+module ConcurrentRubyConcurrencyHelpers
def wait_for_async
e = Concurrent.global_io_executor
until e.completed_task_count == e.scheduled_task_count
@@ -26,3 +60,11 @@ class ActionCable::TestCase < ActiveSupport::TestCase
wait_for_async
end
end
+
+class ActionCable::TestCase < ActiveSupport::TestCase
+ if ENV['FAYE'].present?
+ include EventMachineConcurrencyHelpers
+ else
+ include ConcurrentRubyConcurrencyHelpers
+ end
+end