aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable')
-rw-r--r--actioncable/README.md9
-rw-r--r--actioncable/actioncable.gemspec3
-rw-r--r--actioncable/lib/action_cable/channel/periodic_timers.rb4
-rw-r--r--actioncable/lib/action_cable/channel/streams.rb2
-rw-r--r--actioncable/lib/action_cable/connection.rb5
-rw-r--r--actioncable/lib/action_cable/connection/base.rb32
-rw-r--r--actioncable/lib/action_cable/connection/client_socket.rb152
-rw-r--r--actioncable/lib/action_cable/connection/internal_channel.rb4
-rw-r--r--actioncable/lib/action_cable/connection/stream.rb59
-rw-r--r--actioncable/lib/action_cable/connection/stream_event_loop.rb68
-rw-r--r--actioncable/lib/action_cable/connection/web_socket.rb22
-rw-r--r--actioncable/lib/action_cable/process/logging.rb7
-rw-r--r--actioncable/lib/action_cable/server.rb4
-rw-r--r--actioncable/lib/action_cable/server/base.rb4
-rw-r--r--actioncable/lib/action_cable/server/connections.rb8
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/async.rb4
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/postgresql.rb4
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/redis.rb16
-rw-r--r--actioncable/test/channel/periodic_timers_test.rb2
-rw-r--r--actioncable/test/channel/stream_test.rb22
-rw-r--r--actioncable/test/connection/base_test.rb19
-rw-r--r--actioncable/test/connection/identifier_test.rb4
-rw-r--r--actioncable/test/connection/multiple_identifiers_test.rb4
-rw-r--r--actioncable/test/stubs/test_server.rb3
-rw-r--r--actioncable/test/subscription_adapter/common.rb3
-rw-r--r--actioncable/test/test_helper.rb26
26 files changed, 103 insertions, 387 deletions
diff --git a/actioncable/README.md b/actioncable/README.md
index 63e328321b..cad71ddf94 100644
--- a/actioncable/README.md
+++ b/actioncable/README.md
@@ -443,11 +443,10 @@ The Ruby side of things is built on top of [faye-websocket](https://github.com/f
## Deployment
-Action Cable is powered by a combination of EventMachine and threads. The
-framework plumbing needed for connection handling is handled in the
-EventMachine loop, but the actual channel, user-specified, work is handled
-in a normal Ruby thread. This means you can use all your regular Rails models
-with no problem, as long as you haven't committed any thread-safety sins.
+Action Cable is powered by a combination of websockets and threads. All of the
+connection management is handled internally by utilizing Ruby’s native thread
+support, which means you can use all your regular Rails models with no problems
+as long as you haven’t committed any thread-safety sins.
But this also means that Action Cable needs to run in its own server process.
So you'll have one set of server processes for your normal web work, and another
diff --git a/actioncable/actioncable.gemspec b/actioncable/actioncable.gemspec
index 14f968f1ef..a36acc8f6f 100644
--- a/actioncable/actioncable.gemspec
+++ b/actioncable/actioncable.gemspec
@@ -21,7 +21,8 @@ Gem::Specification.new do |s|
s.add_dependency 'actionpack', version
s.add_dependency 'coffee-rails', '~> 4.1.0'
- s.add_dependency 'nio4r', '~> 1.2'
+ s.add_dependency 'eventmachine', '~> 1.0'
+ s.add_dependency 'faye-websocket', '~> 0.10.0'
s.add_dependency 'websocket-driver', '~> 0.6.1'
s.add_development_dependency 'em-hiredis', '~> 0.3.0'
diff --git a/actioncable/lib/action_cable/channel/periodic_timers.rb b/actioncable/lib/action_cable/channel/periodic_timers.rb
index 56597d02d7..7f0fb37afc 100644
--- a/actioncable/lib/action_cable/channel/periodic_timers.rb
+++ b/actioncable/lib/action_cable/channel/periodic_timers.rb
@@ -27,14 +27,14 @@ module ActionCable
def start_periodic_timers
self.class.periodic_timers.each do |callback, options|
- active_periodic_timers << Concurrent::TimerTask.new(execution_interval: options[:every]) do
+ active_periodic_timers << EventMachine::PeriodicTimer.new(options[:every]) do
connection.worker_pool.async_run_periodic_timer(self, callback)
end
end
end
def stop_periodic_timers
- active_periodic_timers.each { |timer| timer.shutdown }
+ active_periodic_timers.each { |timer| timer.cancel }
end
end
end
diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb
index a26373e387..e2876ef6fa 100644
--- a/actioncable/lib/action_cable/channel/streams.rb
+++ b/actioncable/lib/action_cable/channel/streams.rb
@@ -75,7 +75,7 @@ module ActionCable
callback ||= default_stream_callback(broadcasting)
streams << [ broadcasting, callback ]
- Concurrent.global_io_executor.post do
+ EM.next_tick do
pubsub.subscribe(broadcasting, callback, lambda do
transmit_subscription_confirmation
logger.info "#{self.class.name} is streaming from #{broadcasting}"
diff --git a/actioncable/lib/action_cable/connection.rb b/actioncable/lib/action_cable/connection.rb
index 902efb07e2..b672e00682 100644
--- a/actioncable/lib/action_cable/connection.rb
+++ b/actioncable/lib/action_cable/connection.rb
@@ -5,15 +5,12 @@ module ActionCable
eager_autoload do
autoload :Authorization
autoload :Base
- autoload :ClientSocket
autoload :Identification
autoload :InternalChannel
autoload :MessageBuffer
- autoload :Stream
- autoload :StreamEventLoop
+ autoload :WebSocket
autoload :Subscriptions
autoload :TaggedLoggerProxy
- autoload :WebSocket
end
end
end
diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb
index 0016d1a1a4..bb8850aaa0 100644
--- a/actioncable/lib/action_cable/connection/base.rb
+++ b/actioncable/lib/action_cable/connection/base.rb
@@ -49,14 +49,14 @@ module ActionCable
include Authorization
attr_reader :server, :env, :subscriptions, :logger
- delegate :stream_event_loop, :worker_pool, :pubsub, to: :server
+ delegate :worker_pool, :pubsub, to: :server
def initialize(server, env)
@server, @env = server, env
@logger = new_tagged_logger
- @websocket = ActionCable::Connection::WebSocket.new(env, self, stream_event_loop)
+ @websocket = ActionCable::Connection::WebSocket.new(env)
@subscriptions = ActionCable::Connection::Subscriptions.new(self)
@message_buffer = ActionCable::Connection::MessageBuffer.new(self)
@@ -70,6 +70,10 @@ module ActionCable
logger.info started_request_message
if websocket.possible? && allow_request_origin?
+ websocket.on(:open) { |event| send_async :on_open }
+ websocket.on(:message) { |event| on_message event.data }
+ websocket.on(:close) { |event| send_async :on_close }
+
respond_to_successful_request
else
respond_to_invalid_request
@@ -117,22 +121,6 @@ module ActionCable
transmit ActiveSupport::JSON.encode(identifier: ActionCable::INTERNAL[:identifiers][:ping], message: Time.now.to_i)
end
- def on_open # :nodoc:
- send_async :handle_open
- end
-
- def on_message(message) # :nodoc:
- message_buffer.append message
- end
-
- def on_error(message) # :nodoc:
- # ignore
- end
-
- def on_close # :nodoc:
- send_async :handle_close
- end
-
protected
# The request that initiated the WebSocket connection is available here. This gives access to the environment, cookies, etc.
def request
@@ -151,7 +139,7 @@ module ActionCable
attr_reader :message_buffer
private
- def handle_open
+ def on_open
connect if respond_to?(:connect)
subscribe_to_internal_channel
beat
@@ -162,7 +150,11 @@ module ActionCable
respond_to_invalid_request
end
- def handle_close
+ def on_message(message)
+ message_buffer.append message
+ end
+
+ def on_close
logger.info finished_request_message
server.remove_connection(self)
diff --git a/actioncable/lib/action_cable/connection/client_socket.rb b/actioncable/lib/action_cable/connection/client_socket.rb
deleted file mode 100644
index 62dd753646..0000000000
--- a/actioncable/lib/action_cable/connection/client_socket.rb
+++ /dev/null
@@ -1,152 +0,0 @@
-require 'websocket/driver'
-
-module ActionCable
- module Connection
- #--
- # This class is heavily based on faye-websocket-ruby
- #
- # Copyright (c) 2010-2015 James Coglan
- class ClientSocket # :nodoc:
- def self.determine_url(env)
- scheme = secure_request?(env) ? 'wss:' : 'ws:'
- "#{ scheme }//#{ env['HTTP_HOST'] }#{ env['REQUEST_URI'] }"
- end
-
- def self.secure_request?(env)
- return true if env['HTTPS'] == 'on'
- return true if env['HTTP_X_FORWARDED_SSL'] == 'on'
- return true if env['HTTP_X_FORWARDED_SCHEME'] == 'https'
- return true if env['HTTP_X_FORWARDED_PROTO'] == 'https'
- return true if env['rack.url_scheme'] == 'https'
-
- return false
- end
-
- CONNECTING = 0
- OPEN = 1
- CLOSING = 2
- CLOSED = 3
-
- attr_reader :env, :url
-
- def initialize(env, event_target, stream_event_loop)
- @env = env
- @event_target = event_target
- @stream_event_loop = stream_event_loop
-
- @url = ClientSocket.determine_url(@env)
-
- @driver = @driver_started = nil
-
- @ready_state = CONNECTING
-
- # The driver calls +env+, +url+, and +write+
- @driver = ::WebSocket::Driver.rack(self)
-
- @driver.on(:open) { |e| open }
- @driver.on(:message) { |e| receive_message(e.data) }
- @driver.on(:close) { |e| begin_close(e.reason, e.code) }
- @driver.on(:error) { |e| emit_error(e.message) }
-
- @stream = ActionCable::Connection::Stream.new(@stream_event_loop, self)
-
- if callback = @env['async.callback']
- callback.call([101, {}, @stream])
- end
- end
-
- def start_driver
- return if @driver.nil? || @driver_started
- @driver_started = true
- @driver.start
- end
-
- def rack_response
- start_driver
- [ -1, {}, [] ]
- end
-
- def write(data)
- @stream.write(data)
- end
-
- def transmit(message)
- return false if @ready_state > OPEN
- case message
- when Numeric then @driver.text(message.to_s)
- when String then @driver.text(message)
- when Array then @driver.binary(message)
- else false
- end
- end
-
- def close(code = nil, reason = nil)
- code ||= 1000
- reason ||= ''
-
- unless code == 1000 or (code >= 3000 and code <= 4999)
- raise ArgumentError, "Failed to execute 'close' on WebSocket: " +
- "The code must be either 1000, or between 3000 and 4999. " +
- "#{code} is neither."
- end
-
- @ready_state = CLOSING unless @ready_state == CLOSED
- @driver.close(reason, code)
- end
-
- def parse(data)
- @driver.parse(data)
- end
-
- def client_gone
- finalize_close
- end
-
- def alive?
- @ready_state == OPEN
- end
-
- private
- def open
- return unless @ready_state == CONNECTING
- @ready_state = OPEN
-
- @event_target.on_open
- end
-
- def receive_message(data)
- return unless @ready_state == OPEN
-
- @event_target.on_message(data)
- end
-
- def emit_error(message)
- return if @ready_state >= CLOSING
-
- @event_target.on_error(message)
- end
-
- def begin_close(reason, code)
- return if @ready_state == CLOSED
- @ready_state = CLOSING
- @close_params = [reason, code]
-
- if @stream
- @stream.shutdown
- else
- finalize_close
- end
- end
-
- def finalize_close
- return if @ready_state == CLOSED
- @ready_state = CLOSED
-
- reason = @close_params ? @close_params[0] : ''
- code = @close_params ? @close_params[1] : 1006
-
- @event_target.on_close(code, reason)
- end
- end
- end
-end
diff --git a/actioncable/lib/action_cable/connection/internal_channel.rb b/actioncable/lib/action_cable/connection/internal_channel.rb
index 27826792b3..54ed7672d2 100644
--- a/actioncable/lib/action_cable/connection/internal_channel.rb
+++ b/actioncable/lib/action_cable/connection/internal_channel.rb
@@ -15,14 +15,14 @@ module ActionCable
@_internal_subscriptions ||= []
@_internal_subscriptions << [ internal_channel, callback ]
- Concurrent.global_io_executor.post { pubsub.subscribe(internal_channel, callback) }
+ EM.next_tick { pubsub.subscribe(internal_channel, callback) }
logger.info "Registered connection (#{connection_identifier})"
end
end
def unsubscribe_from_internal_channel
if @_internal_subscriptions.present?
- @_internal_subscriptions.each { |channel, callback| Concurrent.global_io_executor.post { pubsub.unsubscribe(channel, callback) } }
+ @_internal_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe(channel, callback) } }
end
end
diff --git a/actioncable/lib/action_cable/connection/stream.rb b/actioncable/lib/action_cable/connection/stream.rb
deleted file mode 100644
index ace250cd16..0000000000
--- a/actioncable/lib/action_cable/connection/stream.rb
+++ /dev/null
@@ -1,59 +0,0 @@
-module ActionCable
- module Connection
- #--
- # This class is heavily based on faye-websocket-ruby
- #
- # Copyright (c) 2010-2015 James Coglan
- class Stream
- def initialize(event_loop, socket)
- @event_loop = event_loop
- @socket_object = socket
- @stream_send = socket.env['stream.send']
-
- @rack_hijack_io = nil
-
- hijack_rack_socket
- end
-
- def each(&callback)
- @stream_send ||= callback
- end
-
- def close
- shutdown
- @socket_object.client_gone
- end
-
- def shutdown
- clean_rack_hijack
- end
-
- def write(data)
- return @rack_hijack_io.write(data) if @rack_hijack_io
- return @stream_send.call(data) if @stream_send
- rescue EOFError
- @socket_object.client_gone
- end
-
- def receive(data)
- @socket_object.parse(data)
- end
-
- private
- def hijack_rack_socket
- return unless @socket_object.env['rack.hijack']
-
- @socket_object.env['rack.hijack'].call
- @rack_hijack_io = @socket_object.env['rack.hijack_io']
-
- @event_loop.attach(@rack_hijack_io, self)
- end
-
- def clean_rack_hijack
- return unless @rack_hijack_io
- @event_loop.detach(@rack_hijack_io, self)
- @rack_hijack_io = nil
- end
- end
- end
-end
diff --git a/actioncable/lib/action_cable/connection/stream_event_loop.rb b/actioncable/lib/action_cable/connection/stream_event_loop.rb
deleted file mode 100644
index f773814973..0000000000
--- a/actioncable/lib/action_cable/connection/stream_event_loop.rb
+++ /dev/null
@@ -1,68 +0,0 @@
-require 'nio'
-
-module ActionCable
- module Connection
- class StreamEventLoop
- def initialize
- @nio = NIO::Selector.new
- @map = {}
- @stopping = false
- @todo = Queue.new
-
- Thread.new do
- Thread.current.abort_on_exception = true
- run
- end
- end
-
- def attach(io, stream)
- @todo << lambda do
- @map[io] = stream
- @nio.register(io, :r)
- end
- @nio.wakeup
- end
-
- def detach(io, stream)
- @todo << lambda do
- @nio.deregister(io)
- @map.delete io
- end
- @nio.wakeup
- end
-
- def stop
- @stopping = true
- @nio.wakeup
- end
-
- def run
- loop do
- if @stopping
- @nio.close
- break
- end
-
- until @todo.empty?
- @todo.pop(true).call
- end
-
- if monitors = @nio.select
- monitors.each do |monitor|
- io = monitor.io
- stream = @map[io]
-
- begin
- stream.receive io.read_nonblock(4096)
- rescue IO::WaitReadable
- next
- rescue EOFError
- stream.close
- end
- end
- end
- end
- end
- end
- end
-end
diff --git a/actioncable/lib/action_cable/connection/web_socket.rb b/actioncable/lib/action_cable/connection/web_socket.rb
index 5e89fb9b72..670d5690ae 100644
--- a/actioncable/lib/action_cable/connection/web_socket.rb
+++ b/actioncable/lib/action_cable/connection/web_socket.rb
@@ -1,11 +1,13 @@
-require 'websocket/driver'
+require 'faye/websocket'
module ActionCable
module Connection
- # Wrap the real socket to minimize the externally-presented API
+ # Decorate the Faye::WebSocket with helpers we need.
class WebSocket
- def initialize(env, event_target, stream_event_loop)
- @websocket = ::WebSocket::Driver.websocket?(env) ? ClientSocket.new(env, event_target, stream_event_loop) : nil
+ delegate :rack_response, :close, :on, to: :websocket
+
+ def initialize(env)
+ @websocket = Faye::WebSocket.websocket?(env) ? Faye::WebSocket.new(env) : nil
end
def possible?
@@ -13,19 +15,11 @@ module ActionCable
end
def alive?
- websocket && websocket.alive?
+ websocket && websocket.ready_state == Faye::WebSocket::API::OPEN
end
def transmit(data)
- websocket.transmit data
- end
-
- def close
- websocket.close
- end
-
- def rack_response
- websocket.rack_response
+ websocket.send data
end
protected
diff --git a/actioncable/lib/action_cable/process/logging.rb b/actioncable/lib/action_cable/process/logging.rb
new file mode 100644
index 0000000000..dce637b3ca
--- /dev/null
+++ b/actioncable/lib/action_cable/process/logging.rb
@@ -0,0 +1,7 @@
+require 'action_cable/server'
+require 'eventmachine'
+
+EM.error_handler do |e|
+ puts "Error raised inside the event loop: #{e.message}"
+ puts e.backtrace.join("\n")
+end
diff --git a/actioncable/lib/action_cable/server.rb b/actioncable/lib/action_cable/server.rb
index bd6a3826a3..a2a89d5f1e 100644
--- a/actioncable/lib/action_cable/server.rb
+++ b/actioncable/lib/action_cable/server.rb
@@ -1,3 +1,7 @@
+require 'eventmachine'
+EventMachine.epoll if EventMachine.epoll?
+EventMachine.kqueue if EventMachine.kqueue?
+
module ActionCable
module Server
extend ActiveSupport::Autoload
diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb
index b00abd208c..3385a4c9f3 100644
--- a/actioncable/lib/action_cable/server/base.rb
+++ b/actioncable/lib/action_cable/server/base.rb
@@ -32,10 +32,6 @@ module ActionCable
@remote_connections ||= RemoteConnections.new(self)
end
- def stream_event_loop
- @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new
- end
-
# The thread worker pool for handling all the connection work on this server. Default size is set by config.worker_pool_size.
def worker_pool
@worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size)
diff --git a/actioncable/lib/action_cable/server/connections.rb b/actioncable/lib/action_cable/server/connections.rb
index 8671dd5ebd..47dcea8c20 100644
--- a/actioncable/lib/action_cable/server/connections.rb
+++ b/actioncable/lib/action_cable/server/connections.rb
@@ -22,9 +22,11 @@ module ActionCable
# then can't rely on being able to receive and send to it. So there's a 3 second heartbeat running on all connections. If the beat fails, we automatically
# disconnect.
def setup_heartbeat_timer
- @heartbeat_timer ||= Concurrent::TimerTask.new(execution_interval: BEAT_INTERVAL) do
- Concurrent.global_io_executor.post { connections.map(&:beat) }
- end.tap(&:execute)
+ EM.next_tick do
+ @heartbeat_timer ||= EventMachine.add_periodic_timer(BEAT_INTERVAL) do
+ EM.next_tick { connections.map(&:beat) }
+ end
+ end
end
def open_connections_statistics
diff --git a/actioncable/lib/action_cable/subscription_adapter/async.rb b/actioncable/lib/action_cable/subscription_adapter/async.rb
index c88b03947a..85d4892e4c 100644
--- a/actioncable/lib/action_cable/subscription_adapter/async.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/async.rb
@@ -10,11 +10,11 @@ module ActionCable
class AsyncSubscriberMap < SubscriberMap
def add_subscriber(*)
- Concurrent.global_io_executor.post { super }
+ ::EM.next_tick { super }
end
def invoke_callback(*)
- Concurrent.global_io_executor.post { super }
+ ::EM.next_tick { super }
end
end
end
diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
index 3ce1bbed68..78f8aeb599 100644
--- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
@@ -63,7 +63,7 @@ module ActionCable
case action
when :listen
pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}")
- Concurrent.global_io_executor << callback if callback
+ ::EM.next_tick(&callback) if callback
when :unlisten
pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}")
when :shutdown
@@ -93,7 +93,7 @@ module ActionCable
end
def invoke_callback(*)
- Concurrent.global_io_executor.post { super }
+ ::EM.next_tick { super }
end
end
end
diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb
index a035e3988d..3b86354621 100644
--- a/actioncable/lib/action_cable/subscription_adapter/redis.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb
@@ -1,18 +1,11 @@
-require 'thread'
-
gem 'em-hiredis', '~> 0.3.0'
gem 'redis', '~> 3.0'
require 'em-hiredis'
require 'redis'
-EventMachine.epoll if EventMachine.epoll?
-EventMachine.kqueue if EventMachine.kqueue?
-
module ActionCable
module SubscriptionAdapter
class Redis < Base # :nodoc:
- @@mutex = Mutex.new
-
def broadcast(channel, payload)
redis_connection_for_broadcasts.publish(channel, payload)
end
@@ -34,7 +27,6 @@ module ActionCable
private
def redis_connection_for_subscriptions
- ensure_reactor_running
@redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis|
redis.on(:reconnect_failed) do
@logger.info "[ActionCable] Redis reconnect failed."
@@ -45,14 +37,6 @@ module ActionCable
def redis_connection_for_broadcasts
@redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable)
end
-
- def ensure_reactor_running
- return if EventMachine.reactor_running?
- @@mutex.synchronize do
- Thread.new { EventMachine.run } unless EventMachine.reactor_running?
- Thread.pass until EventMachine.reactor_running?
- end
- end
end
end
end
diff --git a/actioncable/test/channel/periodic_timers_test.rb b/actioncable/test/channel/periodic_timers_test.rb
index 64f0247cd6..1590a12f09 100644
--- a/actioncable/test/channel/periodic_timers_test.rb
+++ b/actioncable/test/channel/periodic_timers_test.rb
@@ -31,7 +31,7 @@ class ActionCable::Channel::PeriodicTimersTest < ActiveSupport::TestCase
end
test "timer start and stop" do
- Concurrent::TimerTask.expects(:new).times(2).returns(true)
+ EventMachine::PeriodicTimer.expects(:new).times(2).returns(true)
channel = ChatChannel.new @connection, "{id: 1}", { id: 1 }
channel.expects(:stop_periodic_timers).once
diff --git a/actioncable/test/channel/stream_test.rb b/actioncable/test/channel/stream_test.rb
index 947efd96d4..3fa2b291b7 100644
--- a/actioncable/test/channel/stream_test.rb
+++ b/actioncable/test/channel/stream_test.rb
@@ -31,7 +31,9 @@ class ActionCable::Channel::StreamTest < ActionCable::TestCase
test "stream_for" do
run_in_eventmachine do
connection = TestConnection.new
- connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) }
+ EM.next_tick do
+ connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) }
+ end
channel = ChatChannel.new connection, ""
channel.stream_for Room.new(1)
@@ -39,35 +41,39 @@ class ActionCable::Channel::StreamTest < ActionCable::TestCase
end
test "stream_from subscription confirmation" do
- run_in_eventmachine do
+ EM.run do
connection = TestConnection.new
ChatChannel.new connection, "{id: 1}", { id: 1 }
assert_nil connection.last_transmission
- wait_for_async
+ EM::Timer.new(0.1) do
+ expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "confirm_subscription"
+ connection.transmit(expected)
- expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "confirm_subscription"
- connection.transmit(expected)
+ assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation within 0.1s"
- assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation within 0.1s"
+ EM.run_deferred_callbacks
+ EM.stop
+ end
end
end
test "subscription confirmation should only be sent out once" do
- run_in_eventmachine do
+ EM.run do
connection = TestConnection.new
channel = ChatChannel.new connection, "test_channel"
channel.send_confirmation
channel.send_confirmation
- wait_for_async
+ EM.run_deferred_callbacks
expected = ActiveSupport::JSON.encode "identifier" => "test_channel", "type" => "confirm_subscription"
assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation"
assert_equal 1, connection.transmissions.size
+ EM.stop
end
end
diff --git a/actioncable/test/connection/base_test.rb b/actioncable/test/connection/base_test.rb
index e2b017a9a1..182562db82 100644
--- a/actioncable/test/connection/base_test.rb
+++ b/actioncable/test/connection/base_test.rb
@@ -37,8 +37,6 @@ class ActionCable::Connection::BaseTest < ActionCable::TestCase
connection.process
assert connection.websocket.possible?
-
- wait_for_async
assert connection.websocket.alive?
end
end
@@ -55,15 +53,16 @@ class ActionCable::Connection::BaseTest < ActionCable::TestCase
test "on connection open" do
run_in_eventmachine do
connection = open_connection
+ connection.process
connection.websocket.expects(:transmit).with(regexp_matches(/\_ping/))
connection.message_buffer.expects(:process!)
- connection.process
- wait_for_async
-
- assert_equal [ connection ], @server.connections
- assert connection.connected
+ # Allow EM to run on_open callback
+ EM.next_tick do
+ assert_equal [ connection ], @server.connections
+ assert connection.connected
+ end
end
end
@@ -73,12 +72,12 @@ class ActionCable::Connection::BaseTest < ActionCable::TestCase
connection.process
# Setup the connection
- Concurrent::TimerTask.stubs(:new).returns(true)
- connection.send :handle_open
+ EventMachine.stubs(:add_periodic_timer).returns(true)
+ connection.send :on_open
assert connection.connected
connection.subscriptions.expects(:unsubscribe_from_all)
- connection.send :handle_close
+ connection.send :on_close
assert ! connection.connected
assert_equal [], @server.connections
diff --git a/actioncable/test/connection/identifier_test.rb b/actioncable/test/connection/identifier_test.rb
index 1019ad541e..a110dfdee0 100644
--- a/actioncable/test/connection/identifier_test.rb
+++ b/actioncable/test/connection/identifier_test.rb
@@ -68,10 +68,10 @@ class ActionCable::Connection::IdentifierTest < ActionCable::TestCase
@connection = Connection.new(server, env)
@connection.process
- @connection.send :handle_open
+ @connection.send :on_open
end
def close_connection
- @connection.send :handle_close
+ @connection.send :on_close
end
end
diff --git a/actioncable/test/connection/multiple_identifiers_test.rb b/actioncable/test/connection/multiple_identifiers_test.rb
index e9bb4e6d7f..55a9f96cb3 100644
--- a/actioncable/test/connection/multiple_identifiers_test.rb
+++ b/actioncable/test/connection/multiple_identifiers_test.rb
@@ -32,10 +32,10 @@ class ActionCable::Connection::MultipleIdentifiersTest < ActionCable::TestCase
@connection = Connection.new(server, env)
@connection.process
- @connection.send :handle_open
+ @connection.send :on_open
end
def close_connection
- @connection.send :handle_close
+ @connection.send :on_close
end
end
diff --git a/actioncable/test/stubs/test_server.rb b/actioncable/test/stubs/test_server.rb
index 56d132b30a..6e6541a952 100644
--- a/actioncable/test/stubs/test_server.rb
+++ b/actioncable/test/stubs/test_server.rb
@@ -14,7 +14,6 @@ class TestServer
@config.subscription_adapter.new(self)
end
- def stream_event_loop
- @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new
+ def send_async
end
end
diff --git a/actioncable/test/subscription_adapter/common.rb b/actioncable/test/subscription_adapter/common.rb
index 361858784e..d4a13be889 100644
--- a/actioncable/test/subscription_adapter/common.rb
+++ b/actioncable/test/subscription_adapter/common.rb
@@ -1,6 +1,7 @@
require 'test_helper'
require 'concurrent'
+require 'action_cable/process/logging'
require 'active_support/core_ext/hash/indifferent_access'
require 'pathname'
@@ -23,6 +24,8 @@ module CommonSubscriptionAdapterTest
# and now the "real" setup for our test:
+ spawn_eventmachine
+
server.config.cable = cable_config.with_indifferent_access
adapter_klass = server.config.pubsub_adapter
diff --git a/actioncable/test/test_helper.rb b/actioncable/test/test_helper.rb
index 8ddbd4e764..6636ce078b 100644
--- a/actioncable/test/test_helper.rb
+++ b/actioncable/test/test_helper.rb
@@ -13,16 +13,28 @@ require 'rack/mock'
# Require all the stubs and models
Dir[File.dirname(__FILE__) + '/stubs/*.rb'].each {|file| require file }
+require 'faye/websocket'
+class << Faye::WebSocket
+ remove_method :ensure_reactor_running
+
+ # We don't want Faye to start the EM reactor in tests because it makes testing much harder.
+ # We want to be able to start and stop EM loop in tests to make things simpler.
+ def ensure_reactor_running
+ # no-op
+ end
+end
+
class ActionCable::TestCase < ActiveSupport::TestCase
- def wait_for_async
- e = Concurrent.global_io_executor
- until e.completed_task_count == e.scheduled_task_count
- sleep 0.1
+ def run_in_eventmachine
+ EM.run do
+ yield
+
+ EM.run_deferred_callbacks
+ EM.stop
end
end
- def run_in_eventmachine
- yield
- wait_for_async
+ def spawn_eventmachine
+ Thread.new { EventMachine.run } unless EventMachine.reactor_running?
end
end