aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable')
-rw-r--r--actioncable/actioncable.gemspec3
-rw-r--r--actioncable/lib/action_cable/channel/periodic_timers.rb4
-rw-r--r--actioncable/lib/action_cable/channel/streams.rb2
-rw-r--r--actioncable/lib/action_cable/connection.rb5
-rw-r--r--actioncable/lib/action_cable/connection/base.rb32
-rw-r--r--actioncable/lib/action_cable/connection/client_socket.rb150
-rw-r--r--actioncable/lib/action_cable/connection/internal_channel.rb4
-rw-r--r--actioncable/lib/action_cable/connection/stream.rb59
-rw-r--r--actioncable/lib/action_cable/connection/stream_event_loop.rb94
-rw-r--r--actioncable/lib/action_cable/connection/web_socket.rb22
-rw-r--r--actioncable/lib/action_cable/process/logging.rb7
-rw-r--r--actioncable/lib/action_cable/server.rb4
-rw-r--r--actioncable/lib/action_cable/server/base.rb25
-rw-r--r--actioncable/lib/action_cable/server/connections.rb8
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/async.rb8
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/inline.rb11
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/postgresql.rb11
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/redis.rb33
-rw-r--r--actioncable/test/channel/periodic_timers_test.rb2
-rw-r--r--actioncable/test/channel/stream_test.rb22
-rw-r--r--actioncable/test/client/echo_channel.rb18
-rw-r--r--actioncable/test/client_test.rb262
-rw-r--r--actioncable/test/connection/base_test.rb19
-rw-r--r--actioncable/test/connection/identifier_test.rb4
-rw-r--r--actioncable/test/connection/multiple_identifiers_test.rb4
-rw-r--r--actioncable/test/stubs/test_server.rb3
-rw-r--r--actioncable/test/subscription_adapter/common.rb3
-rw-r--r--actioncable/test/test_helper.rb26
28 files changed, 732 insertions, 113 deletions
diff --git a/actioncable/actioncable.gemspec b/actioncable/actioncable.gemspec
index a36acc8f6f..14f968f1ef 100644
--- a/actioncable/actioncable.gemspec
+++ b/actioncable/actioncable.gemspec
@@ -21,8 +21,7 @@ Gem::Specification.new do |s|
s.add_dependency 'actionpack', version
s.add_dependency 'coffee-rails', '~> 4.1.0'
- s.add_dependency 'eventmachine', '~> 1.0'
- s.add_dependency 'faye-websocket', '~> 0.10.0'
+ s.add_dependency 'nio4r', '~> 1.2'
s.add_dependency 'websocket-driver', '~> 0.6.1'
s.add_development_dependency 'em-hiredis', '~> 0.3.0'
diff --git a/actioncable/lib/action_cable/channel/periodic_timers.rb b/actioncable/lib/action_cable/channel/periodic_timers.rb
index 7f0fb37afc..56597d02d7 100644
--- a/actioncable/lib/action_cable/channel/periodic_timers.rb
+++ b/actioncable/lib/action_cable/channel/periodic_timers.rb
@@ -27,14 +27,14 @@ module ActionCable
def start_periodic_timers
self.class.periodic_timers.each do |callback, options|
- active_periodic_timers << EventMachine::PeriodicTimer.new(options[:every]) do
+ active_periodic_timers << Concurrent::TimerTask.new(execution_interval: options[:every]) do
connection.worker_pool.async_run_periodic_timer(self, callback)
end
end
end
def stop_periodic_timers
- active_periodic_timers.each { |timer| timer.cancel }
+ active_periodic_timers.each { |timer| timer.shutdown }
end
end
end
diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb
index e2876ef6fa..a26373e387 100644
--- a/actioncable/lib/action_cable/channel/streams.rb
+++ b/actioncable/lib/action_cable/channel/streams.rb
@@ -75,7 +75,7 @@ module ActionCable
callback ||= default_stream_callback(broadcasting)
streams << [ broadcasting, callback ]
- EM.next_tick do
+ Concurrent.global_io_executor.post do
pubsub.subscribe(broadcasting, callback, lambda do
transmit_subscription_confirmation
logger.info "#{self.class.name} is streaming from #{broadcasting}"
diff --git a/actioncable/lib/action_cable/connection.rb b/actioncable/lib/action_cable/connection.rb
index b672e00682..902efb07e2 100644
--- a/actioncable/lib/action_cable/connection.rb
+++ b/actioncable/lib/action_cable/connection.rb
@@ -5,12 +5,15 @@ module ActionCable
eager_autoload do
autoload :Authorization
autoload :Base
+ autoload :ClientSocket
autoload :Identification
autoload :InternalChannel
autoload :MessageBuffer
- autoload :WebSocket
+ autoload :Stream
+ autoload :StreamEventLoop
autoload :Subscriptions
autoload :TaggedLoggerProxy
+ autoload :WebSocket
end
end
end
diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb
index bb8850aaa0..b5f898436a 100644
--- a/actioncable/lib/action_cable/connection/base.rb
+++ b/actioncable/lib/action_cable/connection/base.rb
@@ -49,14 +49,14 @@ module ActionCable
include Authorization
attr_reader :server, :env, :subscriptions, :logger
- delegate :worker_pool, :pubsub, to: :server
+ delegate :stream_event_loop, :worker_pool, :pubsub, to: :server
def initialize(server, env)
@server, @env = server, env
@logger = new_tagged_logger
- @websocket = ActionCable::Connection::WebSocket.new(env)
+ @websocket = ActionCable::Connection::WebSocket.new(env, self, stream_event_loop)
@subscriptions = ActionCable::Connection::Subscriptions.new(self)
@message_buffer = ActionCable::Connection::MessageBuffer.new(self)
@@ -70,10 +70,6 @@ module ActionCable
logger.info started_request_message
if websocket.possible? && allow_request_origin?
- websocket.on(:open) { |event| send_async :on_open }
- websocket.on(:message) { |event| on_message event.data }
- websocket.on(:close) { |event| send_async :on_close }
-
respond_to_successful_request
else
respond_to_invalid_request
@@ -121,6 +117,22 @@ module ActionCable
transmit ActiveSupport::JSON.encode(identifier: ActionCable::INTERNAL[:identifiers][:ping], message: Time.now.to_i)
end
+ def on_open # :nodoc:
+ send_async :handle_open
+ end
+
+ def on_message(message) # :nodoc:
+ message_buffer.append message
+ end
+
+ def on_error(message) # :nodoc:
+ # ignore
+ end
+
+ def on_close(reason, code) # :nodoc:
+ send_async :handle_close
+ end
+
protected
# The request that initiated the WebSocket connection is available here. This gives access to the environment, cookies, etc.
def request
@@ -139,7 +151,7 @@ module ActionCable
attr_reader :message_buffer
private
- def on_open
+ def handle_open
connect if respond_to?(:connect)
subscribe_to_internal_channel
beat
@@ -150,11 +162,7 @@ module ActionCable
respond_to_invalid_request
end
- def on_message(message)
- message_buffer.append message
- end
-
- def on_close
+ def handle_close
logger.info finished_request_message
server.remove_connection(self)
diff --git a/actioncable/lib/action_cable/connection/client_socket.rb b/actioncable/lib/action_cable/connection/client_socket.rb
new file mode 100644
index 0000000000..ef937d7c16
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/client_socket.rb
@@ -0,0 +1,150 @@
+require 'websocket/driver'
+
+module ActionCable
+ module Connection
+ #--
+ # This class is heavily based on faye-websocket-ruby
+ #
+ # Copyright (c) 2010-2015 James Coglan
+ class ClientSocket # :nodoc:
+ def self.determine_url(env)
+ scheme = secure_request?(env) ? 'wss:' : 'ws:'
+ "#{ scheme }//#{ env['HTTP_HOST'] }#{ env['REQUEST_URI'] }"
+ end
+
+ def self.secure_request?(env)
+ return true if env['HTTPS'] == 'on'
+ return true if env['HTTP_X_FORWARDED_SSL'] == 'on'
+ return true if env['HTTP_X_FORWARDED_SCHEME'] == 'https'
+ return true if env['HTTP_X_FORWARDED_PROTO'] == 'https'
+ return true if env['rack.url_scheme'] == 'https'
+
+ return false
+ end
+
+ CONNECTING = 0
+ OPEN = 1
+ CLOSING = 2
+ CLOSED = 3
+
+ attr_reader :env, :url
+
+ def initialize(env, event_target, stream_event_loop)
+ @env = env
+ @event_target = event_target
+ @stream_event_loop = stream_event_loop
+
+ @url = ClientSocket.determine_url(@env)
+
+ @driver = @driver_started = nil
+ @close_params = ['', 1006]
+
+ @ready_state = CONNECTING
+
+ # The driver calls +env+, +url+, and +write+
+ @driver = ::WebSocket::Driver.rack(self)
+
+ @driver.on(:open) { |e| open }
+ @driver.on(:message) { |e| receive_message(e.data) }
+ @driver.on(:close) { |e| begin_close(e.reason, e.code) }
+ @driver.on(:error) { |e| emit_error(e.message) }
+
+ @stream = ActionCable::Connection::Stream.new(@stream_event_loop, self)
+
+ if callback = @env['async.callback']
+ callback.call([101, {}, @stream])
+ end
+ end
+
+ def start_driver
+ return if @driver.nil? || @driver_started
+ @driver_started = true
+ @driver.start
+ end
+
+ def rack_response
+ start_driver
+ [ -1, {}, [] ]
+ end
+
+ def write(data)
+ @stream.write(data)
+ end
+
+ def transmit(message)
+ return false if @ready_state > OPEN
+ case message
+ when Numeric then @driver.text(message.to_s)
+ when String then @driver.text(message)
+ when Array then @driver.binary(message)
+ else false
+ end
+ end
+
+ def close(code = nil, reason = nil)
+ code ||= 1000
+ reason ||= ''
+
+ unless code == 1000 or (code >= 3000 and code <= 4999)
+ raise ArgumentError, "Failed to execute 'close' on WebSocket: " +
+ "The code must be either 1000, or between 3000 and 4999. " +
+ "#{code} is neither."
+ end
+
+ @ready_state = CLOSING unless @ready_state == CLOSED
+ @driver.close(reason, code)
+ end
+
+ def parse(data)
+ @driver.parse(data)
+ end
+
+ def client_gone
+ finalize_close
+ end
+
+ def alive?
+ @ready_state == OPEN
+ end
+
+ private
+ def open
+ return unless @ready_state == CONNECTING
+ @ready_state = OPEN
+
+ @event_target.on_open
+ end
+
+ def receive_message(data)
+ return unless @ready_state == OPEN
+
+ @event_target.on_message(data)
+ end
+
+ def emit_error(message)
+ return if @ready_state >= CLOSING
+
+ @event_target.on_error(message)
+ end
+
+ def begin_close(reason, code)
+ return if @ready_state == CLOSED
+ @ready_state = CLOSING
+ @close_params = [reason, code]
+
+ if @stream
+ @stream.shutdown
+ else
+ finalize_close
+ end
+ end
+
+ def finalize_close
+ return if @ready_state == CLOSED
+ @ready_state = CLOSED
+
+ @event_target.on_close(*@close_params)
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/connection/internal_channel.rb b/actioncable/lib/action_cable/connection/internal_channel.rb
index 54ed7672d2..27826792b3 100644
--- a/actioncable/lib/action_cable/connection/internal_channel.rb
+++ b/actioncable/lib/action_cable/connection/internal_channel.rb
@@ -15,14 +15,14 @@ module ActionCable
@_internal_subscriptions ||= []
@_internal_subscriptions << [ internal_channel, callback ]
- EM.next_tick { pubsub.subscribe(internal_channel, callback) }
+ Concurrent.global_io_executor.post { pubsub.subscribe(internal_channel, callback) }
logger.info "Registered connection (#{connection_identifier})"
end
end
def unsubscribe_from_internal_channel
if @_internal_subscriptions.present?
- @_internal_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe(channel, callback) } }
+ @_internal_subscriptions.each { |channel, callback| Concurrent.global_io_executor.post { pubsub.unsubscribe(channel, callback) } }
end
end
diff --git a/actioncable/lib/action_cable/connection/stream.rb b/actioncable/lib/action_cable/connection/stream.rb
new file mode 100644
index 0000000000..ace250cd16
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/stream.rb
@@ -0,0 +1,59 @@
+module ActionCable
+ module Connection
+ #--
+ # This class is heavily based on faye-websocket-ruby
+ #
+ # Copyright (c) 2010-2015 James Coglan
+ class Stream
+ def initialize(event_loop, socket)
+ @event_loop = event_loop
+ @socket_object = socket
+ @stream_send = socket.env['stream.send']
+
+ @rack_hijack_io = nil
+
+ hijack_rack_socket
+ end
+
+ def each(&callback)
+ @stream_send ||= callback
+ end
+
+ def close
+ shutdown
+ @socket_object.client_gone
+ end
+
+ def shutdown
+ clean_rack_hijack
+ end
+
+ def write(data)
+ return @rack_hijack_io.write(data) if @rack_hijack_io
+ return @stream_send.call(data) if @stream_send
+ rescue EOFError
+ @socket_object.client_gone
+ end
+
+ def receive(data)
+ @socket_object.parse(data)
+ end
+
+ private
+ def hijack_rack_socket
+ return unless @socket_object.env['rack.hijack']
+
+ @socket_object.env['rack.hijack'].call
+ @rack_hijack_io = @socket_object.env['rack.hijack_io']
+
+ @event_loop.attach(@rack_hijack_io, self)
+ end
+
+ def clean_rack_hijack
+ return unless @rack_hijack_io
+ @event_loop.detach(@rack_hijack_io, self)
+ @rack_hijack_io = nil
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/connection/stream_event_loop.rb b/actioncable/lib/action_cable/connection/stream_event_loop.rb
new file mode 100644
index 0000000000..e6335082d2
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/stream_event_loop.rb
@@ -0,0 +1,94 @@
+require 'nio'
+require 'thread'
+
+module ActionCable
+ module Connection
+ class StreamEventLoop
+ def initialize
+ @nio = @thread = nil
+ @map = {}
+ @stopping = false
+ @todo = Queue.new
+
+ @spawn_mutex = Mutex.new
+ spawn
+ end
+
+ def attach(io, stream)
+ @todo << lambda do
+ @map[io] = stream
+ @nio.register(io, :r)
+ end
+ wakeup
+ end
+
+ def detach(io, stream)
+ @todo << lambda do
+ @nio.deregister io
+ @map.delete io
+ end
+ wakeup
+ end
+
+ def stop
+ @stopping = true
+ wakeup if @nio
+ end
+
+ private
+ def spawn
+ return if @thread && @thread.status
+
+ @spawn_mutex.synchronize do
+ return if @thread && @thread.status
+
+ @nio ||= NIO::Selector.new
+ @thread = Thread.new { run }
+
+ return true
+ end
+ end
+
+ def wakeup
+ spawn || @nio.wakeup
+ end
+
+ def run
+ loop do
+ if @stopping
+ @nio.close
+ break
+ end
+
+ until @todo.empty?
+ @todo.pop(true).call
+ end
+
+ next unless monitors = @nio.select
+
+ monitors.each do |monitor|
+ io = monitor.io
+ stream = @map[io]
+
+ begin
+ stream.receive io.read_nonblock(4096)
+ rescue IO::WaitReadable
+ next
+ rescue
+ # We expect one of EOFError or Errno::ECONNRESET in
+ # normal operation (when the client goes away). But if
+ # anything else goes wrong, this is still the best way
+ # to handle it.
+ begin
+ stream.close
+ rescue
+ @nio.deregister io
+ @map.delete io
+ end
+ end
+ end
+ end
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/connection/web_socket.rb b/actioncable/lib/action_cable/connection/web_socket.rb
index 670d5690ae..5e89fb9b72 100644
--- a/actioncable/lib/action_cable/connection/web_socket.rb
+++ b/actioncable/lib/action_cable/connection/web_socket.rb
@@ -1,13 +1,11 @@
-require 'faye/websocket'
+require 'websocket/driver'
module ActionCable
module Connection
- # Decorate the Faye::WebSocket with helpers we need.
+ # Wrap the real socket to minimize the externally-presented API
class WebSocket
- delegate :rack_response, :close, :on, to: :websocket
-
- def initialize(env)
- @websocket = Faye::WebSocket.websocket?(env) ? Faye::WebSocket.new(env) : nil
+ def initialize(env, event_target, stream_event_loop)
+ @websocket = ::WebSocket::Driver.websocket?(env) ? ClientSocket.new(env, event_target, stream_event_loop) : nil
end
def possible?
@@ -15,11 +13,19 @@ module ActionCable
end
def alive?
- websocket && websocket.ready_state == Faye::WebSocket::API::OPEN
+ websocket && websocket.alive?
end
def transmit(data)
- websocket.send data
+ websocket.transmit data
+ end
+
+ def close
+ websocket.close
+ end
+
+ def rack_response
+ websocket.rack_response
end
protected
diff --git a/actioncable/lib/action_cable/process/logging.rb b/actioncable/lib/action_cable/process/logging.rb
deleted file mode 100644
index dce637b3ca..0000000000
--- a/actioncable/lib/action_cable/process/logging.rb
+++ /dev/null
@@ -1,7 +0,0 @@
-require 'action_cable/server'
-require 'eventmachine'
-
-EM.error_handler do |e|
- puts "Error raised inside the event loop: #{e.message}"
- puts e.backtrace.join("\n")
-end
diff --git a/actioncable/lib/action_cable/server.rb b/actioncable/lib/action_cable/server.rb
index a2a89d5f1e..bd6a3826a3 100644
--- a/actioncable/lib/action_cable/server.rb
+++ b/actioncable/lib/action_cable/server.rb
@@ -1,7 +1,3 @@
-require 'eventmachine'
-EventMachine.epoll if EventMachine.epoll?
-EventMachine.kqueue if EventMachine.kqueue?
-
module ActionCable
module Server
extend ActiveSupport::Autoload
diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb
index 3385a4c9f3..fe48c112df 100644
--- a/actioncable/lib/action_cable/server/base.rb
+++ b/actioncable/lib/action_cable/server/base.rb
@@ -1,3 +1,5 @@
+require 'thread'
+
module ActionCable
module Server
# A singleton ActionCable::Server instance is available via ActionCable.server. It's used by the rack process that starts the cable server, but
@@ -13,7 +15,12 @@ module ActionCable
def self.logger; config.logger; end
delegate :logger, to: :config
+ attr_reader :mutex
+
def initialize
+ @mutex = Mutex.new
+
+ @remote_connections = @stream_event_loop = @worker_pool = @channel_classes = @pubsub = nil
end
# Called by rack to setup the server.
@@ -29,25 +36,31 @@ module ActionCable
# Gateway to RemoteConnections. See that class for details.
def remote_connections
- @remote_connections ||= RemoteConnections.new(self)
+ @remote_connections || @mutex.synchronize { @remote_connections ||= RemoteConnections.new(self) }
+ end
+
+ def stream_event_loop
+ @stream_event_loop || @mutex.synchronize { @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new }
end
# The thread worker pool for handling all the connection work on this server. Default size is set by config.worker_pool_size.
def worker_pool
- @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size)
+ @worker_pool || @mutex.synchronize { @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size) }
end
# Requires and returns a hash of all the channel class constants keyed by name.
def channel_classes
- @channel_classes ||= begin
- config.channel_paths.each { |channel_path| require channel_path }
- config.channel_class_names.each_with_object({}) { |name, hash| hash[name] = name.constantize }
+ @channel_classes || @mutex.synchronize do
+ @channel_classes ||= begin
+ config.channel_paths.each { |channel_path| require channel_path }
+ config.channel_class_names.each_with_object({}) { |name, hash| hash[name] = name.constantize }
+ end
end
end
# Adapter used for all streams/broadcasting.
def pubsub
- @pubsub ||= config.pubsub_adapter.new(self)
+ @pubsub || @mutex.synchronize { @pubsub ||= config.pubsub_adapter.new(self) }
end
# All the identifiers applied to the connection class associated with this server.
diff --git a/actioncable/lib/action_cable/server/connections.rb b/actioncable/lib/action_cable/server/connections.rb
index 47dcea8c20..8671dd5ebd 100644
--- a/actioncable/lib/action_cable/server/connections.rb
+++ b/actioncable/lib/action_cable/server/connections.rb
@@ -22,11 +22,9 @@ module ActionCable
# then can't rely on being able to receive and send to it. So there's a 3 second heartbeat running on all connections. If the beat fails, we automatically
# disconnect.
def setup_heartbeat_timer
- EM.next_tick do
- @heartbeat_timer ||= EventMachine.add_periodic_timer(BEAT_INTERVAL) do
- EM.next_tick { connections.map(&:beat) }
- end
- end
+ @heartbeat_timer ||= Concurrent::TimerTask.new(execution_interval: BEAT_INTERVAL) do
+ Concurrent.global_io_executor.post { connections.map(&:beat) }
+ end.tap(&:execute)
end
def open_connections_statistics
diff --git a/actioncable/lib/action_cable/subscription_adapter/async.rb b/actioncable/lib/action_cable/subscription_adapter/async.rb
index 85d4892e4c..cca6894289 100644
--- a/actioncable/lib/action_cable/subscription_adapter/async.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/async.rb
@@ -4,17 +4,17 @@ module ActionCable
module SubscriptionAdapter
class Async < Inline # :nodoc:
private
- def subscriber_map
- @subscriber_map ||= AsyncSubscriberMap.new
+ def new_subscriber_map
+ AsyncSubscriberMap.new
end
class AsyncSubscriberMap < SubscriberMap
def add_subscriber(*)
- ::EM.next_tick { super }
+ Concurrent.global_io_executor.post { super }
end
def invoke_callback(*)
- ::EM.next_tick { super }
+ Concurrent.global_io_executor.post { super }
end
end
end
diff --git a/actioncable/lib/action_cable/subscription_adapter/inline.rb b/actioncable/lib/action_cable/subscription_adapter/inline.rb
index 4a2a8d23a2..81357faead 100644
--- a/actioncable/lib/action_cable/subscription_adapter/inline.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/inline.rb
@@ -1,6 +1,11 @@
module ActionCable
module SubscriptionAdapter
class Inline < Base # :nodoc:
+ def initialize(*)
+ super
+ @subscriber_map = nil
+ end
+
def broadcast(channel, payload)
subscriber_map.broadcast(channel, payload)
end
@@ -19,7 +24,11 @@ module ActionCable
private
def subscriber_map
- @subscriber_map ||= SubscriberMap.new
+ @subscriber_map || @server.mutex.synchronize { @subscriber_map ||= new_subscriber_map }
+ end
+
+ def new_subscriber_map
+ SubscriberMap.new
end
end
end
diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
index 78f8aeb599..abaeb92e54 100644
--- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
@@ -5,6 +5,11 @@ require 'thread'
module ActionCable
module SubscriptionAdapter
class PostgreSQL < Base # :nodoc:
+ def initialize(*)
+ super
+ @listener = nil
+ end
+
def broadcast(channel, payload)
with_connection do |pg_conn|
pg_conn.exec("NOTIFY #{pg_conn.escape_identifier(channel)}, '#{pg_conn.escape_string(payload)}'")
@@ -37,7 +42,7 @@ module ActionCable
private
def listener
- @listener ||= Listener.new(self)
+ @listener || @server.mutex.synchronize { @listener ||= Listener.new(self) }
end
class Listener < SubscriberMap
@@ -63,7 +68,7 @@ module ActionCable
case action
when :listen
pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}")
- ::EM.next_tick(&callback) if callback
+ Concurrent.global_io_executor << callback if callback
when :unlisten
pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}")
when :shutdown
@@ -93,7 +98,7 @@ module ActionCable
end
def invoke_callback(*)
- ::EM.next_tick { super }
+ Concurrent.global_io_executor.post { super }
end
end
end
diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb
index 3b86354621..560b79df16 100644
--- a/actioncable/lib/action_cable/subscription_adapter/redis.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb
@@ -1,11 +1,23 @@
+require 'thread'
+
gem 'em-hiredis', '~> 0.3.0'
gem 'redis', '~> 3.0'
require 'em-hiredis'
require 'redis'
+EventMachine.epoll if EventMachine.epoll?
+EventMachine.kqueue if EventMachine.kqueue?
+
module ActionCable
module SubscriptionAdapter
class Redis < Base # :nodoc:
+ @@mutex = Mutex.new
+
+ def initialize(*)
+ super
+ @redis_connection_for_broadcasts = @redis_connection_for_subscriptions = nil
+ end
+
def broadcast(channel, payload)
redis_connection_for_broadcasts.publish(channel, payload)
end
@@ -27,15 +39,28 @@ module ActionCable
private
def redis_connection_for_subscriptions
- @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis|
- redis.on(:reconnect_failed) do
- @logger.info "[ActionCable] Redis reconnect failed."
+ ensure_reactor_running
+ @redis_connection_for_subscriptions || @server.mutex.synchronize do
+ @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis|
+ redis.on(:reconnect_failed) do
+ @logger.info "[ActionCable] Redis reconnect failed."
+ end
end
end
end
def redis_connection_for_broadcasts
- @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable)
+ @redis_connection_for_broadcasts || @server.mutex.synchronize do
+ @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable)
+ end
+ end
+
+ def ensure_reactor_running
+ return if EventMachine.reactor_running?
+ @@mutex.synchronize do
+ Thread.new { EventMachine.run } unless EventMachine.reactor_running?
+ Thread.pass until EventMachine.reactor_running?
+ end
end
end
end
diff --git a/actioncable/test/channel/periodic_timers_test.rb b/actioncable/test/channel/periodic_timers_test.rb
index 1590a12f09..64f0247cd6 100644
--- a/actioncable/test/channel/periodic_timers_test.rb
+++ b/actioncable/test/channel/periodic_timers_test.rb
@@ -31,7 +31,7 @@ class ActionCable::Channel::PeriodicTimersTest < ActiveSupport::TestCase
end
test "timer start and stop" do
- EventMachine::PeriodicTimer.expects(:new).times(2).returns(true)
+ Concurrent::TimerTask.expects(:new).times(2).returns(true)
channel = ChatChannel.new @connection, "{id: 1}", { id: 1 }
channel.expects(:stop_periodic_timers).once
diff --git a/actioncable/test/channel/stream_test.rb b/actioncable/test/channel/stream_test.rb
index 3fa2b291b7..947efd96d4 100644
--- a/actioncable/test/channel/stream_test.rb
+++ b/actioncable/test/channel/stream_test.rb
@@ -31,9 +31,7 @@ class ActionCable::Channel::StreamTest < ActionCable::TestCase
test "stream_for" do
run_in_eventmachine do
connection = TestConnection.new
- EM.next_tick do
- connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) }
- end
+ connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) }
channel = ChatChannel.new connection, ""
channel.stream_for Room.new(1)
@@ -41,39 +39,35 @@ class ActionCable::Channel::StreamTest < ActionCable::TestCase
end
test "stream_from subscription confirmation" do
- EM.run do
+ run_in_eventmachine do
connection = TestConnection.new
ChatChannel.new connection, "{id: 1}", { id: 1 }
assert_nil connection.last_transmission
- EM::Timer.new(0.1) do
- expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "confirm_subscription"
- connection.transmit(expected)
+ wait_for_async
- assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation within 0.1s"
+ expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "confirm_subscription"
+ connection.transmit(expected)
- EM.run_deferred_callbacks
- EM.stop
- end
+ assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation within 0.1s"
end
end
test "subscription confirmation should only be sent out once" do
- EM.run do
+ run_in_eventmachine do
connection = TestConnection.new
channel = ChatChannel.new connection, "test_channel"
channel.send_confirmation
channel.send_confirmation
- EM.run_deferred_callbacks
+ wait_for_async
expected = ActiveSupport::JSON.encode "identifier" => "test_channel", "type" => "confirm_subscription"
assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation"
assert_equal 1, connection.transmissions.size
- EM.stop
end
end
diff --git a/actioncable/test/client/echo_channel.rb b/actioncable/test/client/echo_channel.rb
new file mode 100644
index 0000000000..63e35f194a
--- /dev/null
+++ b/actioncable/test/client/echo_channel.rb
@@ -0,0 +1,18 @@
+class EchoChannel < ActionCable::Channel::Base
+ def subscribed
+ stream_from "global"
+ end
+
+ def ding(data)
+ transmit(dong: data['message'])
+ end
+
+ def delay(data)
+ sleep 1
+ transmit(dong: data['message'])
+ end
+
+ def bulk(data)
+ ActionCable.server.broadcast "global", wide: data['message']
+ end
+end
diff --git a/actioncable/test/client_test.rb b/actioncable/test/client_test.rb
new file mode 100644
index 0000000000..3d9690fb01
--- /dev/null
+++ b/actioncable/test/client_test.rb
@@ -0,0 +1,262 @@
+require 'test_helper'
+require 'concurrent'
+
+require 'active_support/core_ext/hash/indifferent_access'
+require 'pathname'
+
+require 'faye/websocket'
+require 'json'
+
+class ClientTest < ActionCable::TestCase
+ WAIT_WHEN_EXPECTING_EVENT = 3
+ WAIT_WHEN_NOT_EXPECTING_EVENT = 0.2
+
+ def setup
+ # TODO: ActionCable requires a *lot* of setup at the moment...
+ ::Object.const_set(:ApplicationCable, Module.new)
+ ::ApplicationCable.const_set(:Connection, Class.new(ActionCable::Connection::Base))
+
+ ::Object.const_set(:Rails, Module.new)
+ ::Rails.singleton_class.send(:define_method, :root) { Pathname.new(__dir__) }
+
+ ActionCable.instance_variable_set(:@server, nil)
+ server = ActionCable.server
+ server.config = ActionCable::Server::Configuration.new
+ inner_logger = Logger.new(StringIO.new).tap { |l| l.level = Logger::UNKNOWN }
+ server.config.logger = ActionCable::Connection::TaggedLoggerProxy.new(inner_logger, tags: [])
+
+ server.config.cable = { adapter: 'async' }.with_indifferent_access
+
+ # and now the "real" setup for our test:
+ server.config.disable_request_forgery_protection = true
+ server.config.channel_load_paths = [File.expand_path('client', __dir__)]
+
+ @reactor_mutex = Mutex.new
+ @reactor_users = 0
+ @reactor_running = Concurrent::Event.new
+
+ # faye-websocket is warning-rich
+ @previous_verbose, $VERBOSE = $VERBOSE, nil
+ end
+
+ def teardown
+ if @reactor_running.set?
+ EM.stop
+ end
+
+ $VERBOSE = @previous_verbose
+
+ begin
+ ::Object.send(:remove_const, :ApplicationCable)
+ rescue NameError
+ end
+ begin
+ ::Object.send(:remove_const, :Rails)
+ rescue NameError
+ end
+ end
+
+ def with_puma_server(rack_app = ActionCable.server, port = 3099)
+ server = ::Puma::Server.new(rack_app, ::Puma::Events.strings)
+ server.add_tcp_listener '127.0.0.1', port
+ server.min_threads = 1
+ server.max_threads = 4
+
+ t = Thread.new { server.run.join }
+ yield port
+
+ ensure
+ server.stop(true) if server
+ t.join if t
+ end
+
+ def start_event_machine
+ @reactor_mutex.synchronize do
+ unless @reactor_running.set?
+ @reactor ||= Thread.new do
+ EM.next_tick do
+ @reactor_running.set
+ end
+ EM.run
+ end
+ @reactor_running.wait(WAIT_WHEN_EXPECTING_EVENT)
+ end
+ @reactor_users += 1
+ end
+ end
+
+ def stop_event_machine
+ @reactor_mutex.synchronize do
+ @reactor_users -= 1
+ if @reactor_users < 1
+ @reactor_running.reset
+ EM.stop
+ @reactor = nil
+ end
+ end
+ end
+
+ class SyncClient
+ attr_reader :pings
+
+ def initialize(em_controller, port)
+ em_controller.start_event_machine
+
+ @ws = Faye::WebSocket::Client.new("ws://127.0.0.1:#{port}/")
+ @messages = Queue.new
+ @closed = Concurrent::Event.new
+ @has_messages = Concurrent::Event.new
+ @pings = 0
+
+ open = Concurrent::Event.new
+ error = nil
+
+ @ws.on(:error) do |event|
+ if open.set?
+ @messages << RuntimeError.new(event.message)
+ else
+ error = event.message
+ open.set
+ end
+ end
+
+ @ws.on(:open) do |event|
+ open.set
+ end
+
+ @ws.on(:message) do |event|
+ hash = JSON.parse(event.data)
+ if hash['identifier'] == '_ping'
+ @pings += 1
+ else
+ @messages << hash
+ @has_messages.set
+ end
+ end
+
+ @ws.on(:close) do |event|
+ em_controller.stop_event_machine
+ @closed.set
+ end
+
+ open.wait(WAIT_WHEN_EXPECTING_EVENT)
+ raise error if error
+ end
+
+ def read_message
+ @has_messages.wait(WAIT_WHEN_EXPECTING_EVENT) if @messages.empty?
+ @has_messages.reset if @messages.size < 2
+
+ msg = @messages.pop(true)
+ raise msg if msg.is_a?(Exception)
+
+ msg
+ end
+
+ def read_messages(expected_size = 0)
+ list = []
+ loop do
+ @has_messages.wait(list.size < expected_size ? WAIT_WHEN_EXPECTING_EVENT : WAIT_WHEN_NOT_EXPECTING_EVENT)
+ if @has_messages.set?
+ list << read_message
+ else
+ break
+ end
+ end
+ list
+ end
+
+ def send_message(hash)
+ @ws.send(JSON.dump(hash))
+ end
+
+ def close
+ sleep WAIT_WHEN_NOT_EXPECTING_EVENT
+
+ unless @messages.empty?
+ raise "#{@messages.size} messages unprocessed"
+ end
+
+ @ws.close
+ @closed.wait(WAIT_WHEN_EXPECTING_EVENT)
+ end
+
+ def close!
+ sock = BasicSocket.for_fd(@ws.instance_variable_get(:@stream).detach)
+
+ # Force a TCP reset
+ sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, [1, 0].pack('ii'))
+
+ sock.close
+ end
+ end
+
+ def faye_client(port)
+ SyncClient.new(self, port)
+ end
+
+ def test_single_client
+ with_puma_server do |port|
+ c = faye_client(port)
+ c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
+ assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
+ c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello')
+ assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "message"=>{"dong"=>"hello"}}, c.read_message)
+ c.close
+ end
+ end
+
+ def test_interacting_clients
+ with_puma_server do |port|
+ clients = 10.times.map { faye_client(port) }
+
+ barrier_1 = Concurrent::CyclicBarrier.new(clients.size)
+ barrier_2 = Concurrent::CyclicBarrier.new(clients.size)
+
+ clients.map {|c| Concurrent::Future.execute {
+ c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
+ assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "type"=>"confirm_subscription"}, c.read_message)
+ c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello')
+ assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message)
+ barrier_1.wait WAIT_WHEN_EXPECTING_EVENT
+ c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'bulk', message: 'hello')
+ barrier_2.wait WAIT_WHEN_EXPECTING_EVENT
+ assert_equal clients.size, c.read_messages(clients.size).size
+ } }.each(&:wait!)
+
+ clients.map {|c| Concurrent::Future.execute { c.close } }.each(&:wait!)
+ end
+ end
+
+ def test_many_clients
+ with_puma_server do |port|
+ clients = 200.times.map { faye_client(port) }
+
+ clients.map {|c| Concurrent::Future.execute {
+ c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
+ assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "type"=>"confirm_subscription"}, c.read_message)
+ c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello')
+ assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message)
+ } }.each(&:wait!)
+
+ clients.map {|c| Concurrent::Future.execute { c.close } }.each(&:wait!)
+ end
+ end
+
+ def test_disappearing_client
+ with_puma_server do |port|
+ c = faye_client(port)
+ c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
+ assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
+ c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'delay', message: 'hello')
+ c.close! # disappear before write
+
+ c = faye_client(port)
+ c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
+ assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
+ c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello')
+ assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message)
+ c.close! # disappear before read
+ end
+ end
+end
diff --git a/actioncable/test/connection/base_test.rb b/actioncable/test/connection/base_test.rb
index 182562db82..e2b017a9a1 100644
--- a/actioncable/test/connection/base_test.rb
+++ b/actioncable/test/connection/base_test.rb
@@ -37,6 +37,8 @@ class ActionCable::Connection::BaseTest < ActionCable::TestCase
connection.process
assert connection.websocket.possible?
+
+ wait_for_async
assert connection.websocket.alive?
end
end
@@ -53,16 +55,15 @@ class ActionCable::Connection::BaseTest < ActionCable::TestCase
test "on connection open" do
run_in_eventmachine do
connection = open_connection
- connection.process
connection.websocket.expects(:transmit).with(regexp_matches(/\_ping/))
connection.message_buffer.expects(:process!)
- # Allow EM to run on_open callback
- EM.next_tick do
- assert_equal [ connection ], @server.connections
- assert connection.connected
- end
+ connection.process
+ wait_for_async
+
+ assert_equal [ connection ], @server.connections
+ assert connection.connected
end
end
@@ -72,12 +73,12 @@ class ActionCable::Connection::BaseTest < ActionCable::TestCase
connection.process
# Setup the connection
- EventMachine.stubs(:add_periodic_timer).returns(true)
- connection.send :on_open
+ Concurrent::TimerTask.stubs(:new).returns(true)
+ connection.send :handle_open
assert connection.connected
connection.subscriptions.expects(:unsubscribe_from_all)
- connection.send :on_close
+ connection.send :handle_close
assert ! connection.connected
assert_equal [], @server.connections
diff --git a/actioncable/test/connection/identifier_test.rb b/actioncable/test/connection/identifier_test.rb
index a110dfdee0..1019ad541e 100644
--- a/actioncable/test/connection/identifier_test.rb
+++ b/actioncable/test/connection/identifier_test.rb
@@ -68,10 +68,10 @@ class ActionCable::Connection::IdentifierTest < ActionCable::TestCase
@connection = Connection.new(server, env)
@connection.process
- @connection.send :on_open
+ @connection.send :handle_open
end
def close_connection
- @connection.send :on_close
+ @connection.send :handle_close
end
end
diff --git a/actioncable/test/connection/multiple_identifiers_test.rb b/actioncable/test/connection/multiple_identifiers_test.rb
index 55a9f96cb3..e9bb4e6d7f 100644
--- a/actioncable/test/connection/multiple_identifiers_test.rb
+++ b/actioncable/test/connection/multiple_identifiers_test.rb
@@ -32,10 +32,10 @@ class ActionCable::Connection::MultipleIdentifiersTest < ActionCable::TestCase
@connection = Connection.new(server, env)
@connection.process
- @connection.send :on_open
+ @connection.send :handle_open
end
def close_connection
- @connection.send :on_close
+ @connection.send :handle_close
end
end
diff --git a/actioncable/test/stubs/test_server.rb b/actioncable/test/stubs/test_server.rb
index 6e6541a952..56d132b30a 100644
--- a/actioncable/test/stubs/test_server.rb
+++ b/actioncable/test/stubs/test_server.rb
@@ -14,6 +14,7 @@ class TestServer
@config.subscription_adapter.new(self)
end
- def send_async
+ def stream_event_loop
+ @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new
end
end
diff --git a/actioncable/test/subscription_adapter/common.rb b/actioncable/test/subscription_adapter/common.rb
index d4a13be889..361858784e 100644
--- a/actioncable/test/subscription_adapter/common.rb
+++ b/actioncable/test/subscription_adapter/common.rb
@@ -1,7 +1,6 @@
require 'test_helper'
require 'concurrent'
-require 'action_cable/process/logging'
require 'active_support/core_ext/hash/indifferent_access'
require 'pathname'
@@ -24,8 +23,6 @@ module CommonSubscriptionAdapterTest
# and now the "real" setup for our test:
- spawn_eventmachine
-
server.config.cable = cable_config.with_indifferent_access
adapter_klass = server.config.pubsub_adapter
diff --git a/actioncable/test/test_helper.rb b/actioncable/test/test_helper.rb
index 6636ce078b..8ddbd4e764 100644
--- a/actioncable/test/test_helper.rb
+++ b/actioncable/test/test_helper.rb
@@ -13,28 +13,16 @@ require 'rack/mock'
# Require all the stubs and models
Dir[File.dirname(__FILE__) + '/stubs/*.rb'].each {|file| require file }
-require 'faye/websocket'
-class << Faye::WebSocket
- remove_method :ensure_reactor_running
-
- # We don't want Faye to start the EM reactor in tests because it makes testing much harder.
- # We want to be able to start and stop EM loop in tests to make things simpler.
- def ensure_reactor_running
- # no-op
- end
-end
-
class ActionCable::TestCase < ActiveSupport::TestCase
- def run_in_eventmachine
- EM.run do
- yield
-
- EM.run_deferred_callbacks
- EM.stop
+ def wait_for_async
+ e = Concurrent.global_io_executor
+ until e.completed_task_count == e.scheduled_task_count
+ sleep 0.1
end
end
- def spawn_eventmachine
- Thread.new { EventMachine.run } unless EventMachine.reactor_running?
+ def run_in_eventmachine
+ yield
+ wait_for_async
end
end