aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable')
-rw-r--r--actioncable/.gitignore2
-rw-r--r--actioncable/README.md29
-rw-r--r--actioncable/Rakefile44
-rw-r--r--actioncable/actioncable.gemspec10
-rw-r--r--actioncable/app/assets/javascripts/action_cable.coffee.erb (renamed from actioncable/lib/assets/javascripts/action_cable.coffee.erb)2
-rw-r--r--actioncable/app/assets/javascripts/action_cable/connection.coffee (renamed from actioncable/lib/assets/javascripts/action_cable/connection.coffee)0
-rw-r--r--actioncable/app/assets/javascripts/action_cable/connection_monitor.coffee (renamed from actioncable/lib/assets/javascripts/action_cable/connection_monitor.coffee)0
-rw-r--r--actioncable/app/assets/javascripts/action_cable/consumer.coffee (renamed from actioncable/lib/assets/javascripts/action_cable/consumer.coffee)8
-rw-r--r--actioncable/app/assets/javascripts/action_cable/subscription.coffee (renamed from actioncable/lib/assets/javascripts/action_cable/subscription.coffee)0
-rw-r--r--actioncable/app/assets/javascripts/action_cable/subscriptions.coffee (renamed from actioncable/lib/assets/javascripts/action_cable/subscriptions.coffee)0
-rw-r--r--actioncable/lib/action_cable/channel/periodic_timers.rb4
-rw-r--r--actioncable/lib/action_cable/channel/streams.rb4
-rw-r--r--actioncable/lib/action_cable/connection.rb5
-rw-r--r--actioncable/lib/action_cable/connection/base.rb32
-rw-r--r--actioncable/lib/action_cable/connection/client_socket.rb150
-rw-r--r--actioncable/lib/action_cable/connection/internal_channel.rb4
-rw-r--r--actioncable/lib/action_cable/connection/stream.rb59
-rw-r--r--actioncable/lib/action_cable/connection/stream_event_loop.rb94
-rw-r--r--actioncable/lib/action_cable/connection/web_socket.rb22
-rw-r--r--actioncable/lib/action_cable/gem_version.rb2
-rw-r--r--actioncable/lib/action_cable/process/logging.rb7
-rw-r--r--actioncable/lib/action_cable/server.rb4
-rw-r--r--actioncable/lib/action_cable/server/base.rb25
-rw-r--r--actioncable/lib/action_cable/server/configuration.rb8
-rw-r--r--actioncable/lib/action_cable/server/connections.rb8
-rw-r--r--actioncable/lib/action_cable/subscription_adapter.rb5
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/async.rb22
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/base.rb4
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/evented_redis.rb67
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/inline.rb35
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/postgresql.rb78
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/redis.rb156
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb53
-rw-r--r--actioncable/test/channel/periodic_timers_test.rb2
-rw-r--r--actioncable/test/channel/stream_test.rb22
-rw-r--r--actioncable/test/client/echo_channel.rb18
-rw-r--r--actioncable/test/client_test.rb219
-rw-r--r--actioncable/test/connection/base_test.rb19
-rw-r--r--actioncable/test/connection/identifier_test.rb4
-rw-r--r--actioncable/test/connection/multiple_identifiers_test.rb4
-rw-r--r--actioncable/test/stubs/test_server.rb3
-rw-r--r--actioncable/test/subscription_adapter/async_test.rb17
-rw-r--r--actioncable/test/subscription_adapter/common.rb139
-rw-r--r--actioncable/test/subscription_adapter/evented_redis_test.rb10
-rw-r--r--actioncable/test/subscription_adapter/inline_test.rb17
-rw-r--r--actioncable/test/subscription_adapter/postgresql_test.rb32
-rw-r--r--actioncable/test/subscription_adapter/redis_test.rb16
-rw-r--r--actioncable/test/test_helper.rb24
-rw-r--r--actioncable/test/worker_test.rb5
49 files changed, 1315 insertions, 179 deletions
diff --git a/actioncable/.gitignore b/actioncable/.gitignore
new file mode 100644
index 0000000000..0a04b29786
--- /dev/null
+++ b/actioncable/.gitignore
@@ -0,0 +1,2 @@
+/lib/assets/compiled
+/tmp
diff --git a/actioncable/README.md b/actioncable/README.md
index 636f9935cf..ac57532b62 100644
--- a/actioncable/README.md
+++ b/actioncable/README.md
@@ -66,6 +66,13 @@ end
Here `identified_by` is a connection identifier that can be used to find the specific connection again or later.
Note that anything marked as an identifier will automatically create a delegate by the same name on any channel instances created off the connection.
+This relies on the fact that you will already have handled authentication of the user, and
+that a successful authentication sets a signed cookie with the `user_id`. This cookie is then
+automatically sent to the connection instance when a new connection is attempted, and you
+use that to set the `current_user`. By identifying the connection by this same current_user,
+you're also ensuring that you can later retrieve all open connections by a given user (and
+potentially disconnect them all if the user is deleted or deauthorized).
+
Then you should define your `ApplicationCable::Channel` class in Ruby. This is the place where you put
shared logic between your channels.
@@ -77,13 +84,6 @@ module ApplicationCable
end
```
-This relies on the fact that you will already have handled authentication of the user, and
-that a successful authentication sets a signed cookie with the `user_id`. This cookie is then
-automatically sent to the connection instance when a new connection is attempted, and you
-use that to set the `current_user`. By identifying the connection by this same current_user,
-you're also ensuring that you can later retrieve all open connections by a given user (and
-potentially disconnect them all if the user is deleted or deauthorized).
-
The client-side needs to setup a consumer instance of this connection. That's done like so:
```coffeescript
@@ -302,7 +302,7 @@ Action Cable has three required configurations: the Redis connection, allowed re
### Redis
-By default, `ActionCable::Server::Base` will look for a configuration file in `Rails.root.join('config/redis/cable.yml')`.
+By default, `ActionCable::Server::Base` will look for a configuration file in `Rails.root.join('config/cable.yml')`.
This file must specify a Redis url for each Rails environment. It may use the following format:
```yaml
@@ -316,7 +316,7 @@ test: *development
You can also change the location of the Redis config file in a Rails initializer with something like:
```ruby
-Rails.application.paths.add "config/redis/cable", with: "somewhere/else/cable.yml"
+Rails.application.paths.add "config/cable", with: "somewhere/else/cable.yml"
```
### Allowed Request Origins
@@ -397,8 +397,6 @@ application. The recommended basic setup is as follows:
require ::File.expand_path('../../config/environment', __FILE__)
Rails.application.eager_load!
-require 'action_cable/process/logging'
-
run ActionCable.server
```
@@ -443,11 +441,10 @@ The Ruby side of things is built on top of [faye-websocket](https://github.com/f
## Deployment
-Action Cable is powered by a combination of EventMachine and threads. The
-framework plumbing needed for connection handling is handled in the
-EventMachine loop, but the actual channel, user-specified, work is handled
-in a normal Ruby thread. This means you can use all your regular Rails models
-with no problem, as long as you haven't committed any thread-safety sins.
+Action Cable is powered by a combination of websockets and threads. All of the
+connection management is handled internally by utilizing Ruby’s native thread
+support, which means you can use all your regular Rails models with no problems
+as long as you haven’t committed any thread-safety sins.
But this also means that Action Cable needs to run in its own server process.
So you'll have one set of server processes for your normal web work, and another
diff --git a/actioncable/Rakefile b/actioncable/Rakefile
index b6c56e9195..1d77fc7067 100644
--- a/actioncable/Rakefile
+++ b/actioncable/Rakefile
@@ -1,9 +1,16 @@
require 'rake/testtask'
+require 'pathname'
+require 'sprockets'
+require 'coffee-script'
+require 'action_cable'
dir = File.dirname(__FILE__)
task :default => :test
+task :package => "assets:compile"
+task "package:clean" => "assets:clean"
+
Rake::TestTask.new do |t|
t.libs << "test"
t.test_files = Dir.glob("#{dir}/test/**/*_test.rb")
@@ -11,3 +18,40 @@ Rake::TestTask.new do |t|
t.verbose = true
t.ruby_opts = ["--dev"] if defined?(JRUBY_VERSION)
end
+
+namespace :assets do
+ root_path = Pathname.new(dir)
+ destination_path = root_path.join("lib/assets/compiled")
+
+ desc "Compile dist/action_cable.js"
+ task :compile do
+ puts 'Compiling Action Cable assets...'
+
+ precompile_list = %w(action_cable.js)
+
+ environment = Sprockets::Environment.new
+ environment.gzip = false
+ Pathname.glob(root_path.join("app/assets/*/")) do |subdir|
+ environment.append_path subdir
+ end
+
+ compile_path = root_path.join("tmp/sprockets")
+ compile_path.rmtree if compile_path.exist?
+ compile_path.mkpath
+
+ manifest = Sprockets::Manifest.new(environment.index, compile_path)
+ manifest.compile(precompile_list)
+
+ destination_path.rmtree if destination_path.exist?
+ manifest.assets.each do |path, fingerprint_path|
+ destination_path.join(path).dirname.mkpath
+ FileUtils.cp(compile_path.join(fingerprint_path), destination_path.join(path))
+ end
+
+ puts 'Done'
+ end
+
+ task :clean do
+ destination_path.rmtree if destination_path.exist?
+ end
+end
diff --git a/actioncable/actioncable.gemspec b/actioncable/actioncable.gemspec
index a36acc8f6f..c65ff7871f 100644
--- a/actioncable/actioncable.gemspec
+++ b/actioncable/actioncable.gemspec
@@ -20,14 +20,6 @@ Gem::Specification.new do |s|
s.add_dependency 'actionpack', version
- s.add_dependency 'coffee-rails', '~> 4.1.0'
- s.add_dependency 'eventmachine', '~> 1.0'
- s.add_dependency 'faye-websocket', '~> 0.10.0'
+ s.add_dependency 'nio4r', '~> 1.2'
s.add_dependency 'websocket-driver', '~> 0.6.1'
-
- s.add_development_dependency 'em-hiredis', '~> 0.3.0'
- s.add_development_dependency 'mocha'
- s.add_development_dependency 'pg'
- s.add_development_dependency 'puma'
- s.add_development_dependency 'redis', '~> 3.0'
end
diff --git a/actioncable/lib/assets/javascripts/action_cable.coffee.erb b/actioncable/app/assets/javascripts/action_cable.coffee.erb
index 7daea4ebcd..18a48c0610 100644
--- a/actioncable/lib/assets/javascripts/action_cable.coffee.erb
+++ b/actioncable/app/assets/javascripts/action_cable.coffee.erb
@@ -1,5 +1,5 @@
#= require_self
-#= require action_cable/consumer
+#= require ./action_cable/consumer
@ActionCable =
INTERNAL: <%= ActionCable::INTERNAL.to_json %>
diff --git a/actioncable/lib/assets/javascripts/action_cable/connection.coffee b/actioncable/app/assets/javascripts/action_cable/connection.coffee
index fbd7dbd35b..fbd7dbd35b 100644
--- a/actioncable/lib/assets/javascripts/action_cable/connection.coffee
+++ b/actioncable/app/assets/javascripts/action_cable/connection.coffee
diff --git a/actioncable/lib/assets/javascripts/action_cable/connection_monitor.coffee b/actioncable/app/assets/javascripts/action_cable/connection_monitor.coffee
index 99b9a1c6d5..99b9a1c6d5 100644
--- a/actioncable/lib/assets/javascripts/action_cable/connection_monitor.coffee
+++ b/actioncable/app/assets/javascripts/action_cable/connection_monitor.coffee
diff --git a/actioncable/lib/assets/javascripts/action_cable/consumer.coffee b/actioncable/app/assets/javascripts/action_cable/consumer.coffee
index fcd8d0fb6c..717c0641a9 100644
--- a/actioncable/lib/assets/javascripts/action_cable/consumer.coffee
+++ b/actioncable/app/assets/javascripts/action_cable/consumer.coffee
@@ -1,7 +1,7 @@
-#= require action_cable/connection
-#= require action_cable/connection_monitor
-#= require action_cable/subscriptions
-#= require action_cable/subscription
+#= require ./connection
+#= require ./connection_monitor
+#= require ./subscriptions
+#= require ./subscription
# The ActionCable.Consumer establishes the connection to a server-side Ruby Connection object. Once established,
# the ActionCable.ConnectionMonitor will ensure that its properly maintained through heartbeats and checking for stale updates.
diff --git a/actioncable/lib/assets/javascripts/action_cable/subscription.coffee b/actioncable/app/assets/javascripts/action_cable/subscription.coffee
index 339d676933..339d676933 100644
--- a/actioncable/lib/assets/javascripts/action_cable/subscription.coffee
+++ b/actioncable/app/assets/javascripts/action_cable/subscription.coffee
diff --git a/actioncable/lib/assets/javascripts/action_cable/subscriptions.coffee b/actioncable/app/assets/javascripts/action_cable/subscriptions.coffee
index ae041ffa2b..ae041ffa2b 100644
--- a/actioncable/lib/assets/javascripts/action_cable/subscriptions.coffee
+++ b/actioncable/app/assets/javascripts/action_cable/subscriptions.coffee
diff --git a/actioncable/lib/action_cable/channel/periodic_timers.rb b/actioncable/lib/action_cable/channel/periodic_timers.rb
index 7f0fb37afc..56597d02d7 100644
--- a/actioncable/lib/action_cable/channel/periodic_timers.rb
+++ b/actioncable/lib/action_cable/channel/periodic_timers.rb
@@ -27,14 +27,14 @@ module ActionCable
def start_periodic_timers
self.class.periodic_timers.each do |callback, options|
- active_periodic_timers << EventMachine::PeriodicTimer.new(options[:every]) do
+ active_periodic_timers << Concurrent::TimerTask.new(execution_interval: options[:every]) do
connection.worker_pool.async_run_periodic_timer(self, callback)
end
end
end
def stop_periodic_timers
- active_periodic_timers.each { |timer| timer.cancel }
+ active_periodic_timers.each { |timer| timer.shutdown }
end
end
end
diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb
index 589946c3db..a26373e387 100644
--- a/actioncable/lib/action_cable/channel/streams.rb
+++ b/actioncable/lib/action_cable/channel/streams.rb
@@ -75,8 +75,8 @@ module ActionCable
callback ||= default_stream_callback(broadcasting)
streams << [ broadcasting, callback ]
- EM.next_tick do
- pubsub.subscribe(broadcasting, callback, lambda do |reply|
+ Concurrent.global_io_executor.post do
+ pubsub.subscribe(broadcasting, callback, lambda do
transmit_subscription_confirmation
logger.info "#{self.class.name} is streaming from #{broadcasting}"
end)
diff --git a/actioncable/lib/action_cable/connection.rb b/actioncable/lib/action_cable/connection.rb
index b672e00682..902efb07e2 100644
--- a/actioncable/lib/action_cable/connection.rb
+++ b/actioncable/lib/action_cable/connection.rb
@@ -5,12 +5,15 @@ module ActionCable
eager_autoload do
autoload :Authorization
autoload :Base
+ autoload :ClientSocket
autoload :Identification
autoload :InternalChannel
autoload :MessageBuffer
- autoload :WebSocket
+ autoload :Stream
+ autoload :StreamEventLoop
autoload :Subscriptions
autoload :TaggedLoggerProxy
+ autoload :WebSocket
end
end
end
diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb
index bb8850aaa0..b5f898436a 100644
--- a/actioncable/lib/action_cable/connection/base.rb
+++ b/actioncable/lib/action_cable/connection/base.rb
@@ -49,14 +49,14 @@ module ActionCable
include Authorization
attr_reader :server, :env, :subscriptions, :logger
- delegate :worker_pool, :pubsub, to: :server
+ delegate :stream_event_loop, :worker_pool, :pubsub, to: :server
def initialize(server, env)
@server, @env = server, env
@logger = new_tagged_logger
- @websocket = ActionCable::Connection::WebSocket.new(env)
+ @websocket = ActionCable::Connection::WebSocket.new(env, self, stream_event_loop)
@subscriptions = ActionCable::Connection::Subscriptions.new(self)
@message_buffer = ActionCable::Connection::MessageBuffer.new(self)
@@ -70,10 +70,6 @@ module ActionCable
logger.info started_request_message
if websocket.possible? && allow_request_origin?
- websocket.on(:open) { |event| send_async :on_open }
- websocket.on(:message) { |event| on_message event.data }
- websocket.on(:close) { |event| send_async :on_close }
-
respond_to_successful_request
else
respond_to_invalid_request
@@ -121,6 +117,22 @@ module ActionCable
transmit ActiveSupport::JSON.encode(identifier: ActionCable::INTERNAL[:identifiers][:ping], message: Time.now.to_i)
end
+ def on_open # :nodoc:
+ send_async :handle_open
+ end
+
+ def on_message(message) # :nodoc:
+ message_buffer.append message
+ end
+
+ def on_error(message) # :nodoc:
+ # ignore
+ end
+
+ def on_close(reason, code) # :nodoc:
+ send_async :handle_close
+ end
+
protected
# The request that initiated the WebSocket connection is available here. This gives access to the environment, cookies, etc.
def request
@@ -139,7 +151,7 @@ module ActionCable
attr_reader :message_buffer
private
- def on_open
+ def handle_open
connect if respond_to?(:connect)
subscribe_to_internal_channel
beat
@@ -150,11 +162,7 @@ module ActionCable
respond_to_invalid_request
end
- def on_message(message)
- message_buffer.append message
- end
-
- def on_close
+ def handle_close
logger.info finished_request_message
server.remove_connection(self)
diff --git a/actioncable/lib/action_cable/connection/client_socket.rb b/actioncable/lib/action_cable/connection/client_socket.rb
new file mode 100644
index 0000000000..ef937d7c16
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/client_socket.rb
@@ -0,0 +1,150 @@
+require 'websocket/driver'
+
+module ActionCable
+ module Connection
+ #--
+ # This class is heavily based on faye-websocket-ruby
+ #
+ # Copyright (c) 2010-2015 James Coglan
+ class ClientSocket # :nodoc:
+ def self.determine_url(env)
+ scheme = secure_request?(env) ? 'wss:' : 'ws:'
+ "#{ scheme }//#{ env['HTTP_HOST'] }#{ env['REQUEST_URI'] }"
+ end
+
+ def self.secure_request?(env)
+ return true if env['HTTPS'] == 'on'
+ return true if env['HTTP_X_FORWARDED_SSL'] == 'on'
+ return true if env['HTTP_X_FORWARDED_SCHEME'] == 'https'
+ return true if env['HTTP_X_FORWARDED_PROTO'] == 'https'
+ return true if env['rack.url_scheme'] == 'https'
+
+ return false
+ end
+
+ CONNECTING = 0
+ OPEN = 1
+ CLOSING = 2
+ CLOSED = 3
+
+ attr_reader :env, :url
+
+ def initialize(env, event_target, stream_event_loop)
+ @env = env
+ @event_target = event_target
+ @stream_event_loop = stream_event_loop
+
+ @url = ClientSocket.determine_url(@env)
+
+ @driver = @driver_started = nil
+ @close_params = ['', 1006]
+
+ @ready_state = CONNECTING
+
+ # The driver calls +env+, +url+, and +write+
+ @driver = ::WebSocket::Driver.rack(self)
+
+ @driver.on(:open) { |e| open }
+ @driver.on(:message) { |e| receive_message(e.data) }
+ @driver.on(:close) { |e| begin_close(e.reason, e.code) }
+ @driver.on(:error) { |e| emit_error(e.message) }
+
+ @stream = ActionCable::Connection::Stream.new(@stream_event_loop, self)
+
+ if callback = @env['async.callback']
+ callback.call([101, {}, @stream])
+ end
+ end
+
+ def start_driver
+ return if @driver.nil? || @driver_started
+ @driver_started = true
+ @driver.start
+ end
+
+ def rack_response
+ start_driver
+ [ -1, {}, [] ]
+ end
+
+ def write(data)
+ @stream.write(data)
+ end
+
+ def transmit(message)
+ return false if @ready_state > OPEN
+ case message
+ when Numeric then @driver.text(message.to_s)
+ when String then @driver.text(message)
+ when Array then @driver.binary(message)
+ else false
+ end
+ end
+
+ def close(code = nil, reason = nil)
+ code ||= 1000
+ reason ||= ''
+
+ unless code == 1000 or (code >= 3000 and code <= 4999)
+ raise ArgumentError, "Failed to execute 'close' on WebSocket: " +
+ "The code must be either 1000, or between 3000 and 4999. " +
+ "#{code} is neither."
+ end
+
+ @ready_state = CLOSING unless @ready_state == CLOSED
+ @driver.close(reason, code)
+ end
+
+ def parse(data)
+ @driver.parse(data)
+ end
+
+ def client_gone
+ finalize_close
+ end
+
+ def alive?
+ @ready_state == OPEN
+ end
+
+ private
+ def open
+ return unless @ready_state == CONNECTING
+ @ready_state = OPEN
+
+ @event_target.on_open
+ end
+
+ def receive_message(data)
+ return unless @ready_state == OPEN
+
+ @event_target.on_message(data)
+ end
+
+ def emit_error(message)
+ return if @ready_state >= CLOSING
+
+ @event_target.on_error(message)
+ end
+
+ def begin_close(reason, code)
+ return if @ready_state == CLOSED
+ @ready_state = CLOSING
+ @close_params = [reason, code]
+
+ if @stream
+ @stream.shutdown
+ else
+ finalize_close
+ end
+ end
+
+ def finalize_close
+ return if @ready_state == CLOSED
+ @ready_state = CLOSED
+
+ @event_target.on_close(*@close_params)
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/connection/internal_channel.rb b/actioncable/lib/action_cable/connection/internal_channel.rb
index 54ed7672d2..27826792b3 100644
--- a/actioncable/lib/action_cable/connection/internal_channel.rb
+++ b/actioncable/lib/action_cable/connection/internal_channel.rb
@@ -15,14 +15,14 @@ module ActionCable
@_internal_subscriptions ||= []
@_internal_subscriptions << [ internal_channel, callback ]
- EM.next_tick { pubsub.subscribe(internal_channel, callback) }
+ Concurrent.global_io_executor.post { pubsub.subscribe(internal_channel, callback) }
logger.info "Registered connection (#{connection_identifier})"
end
end
def unsubscribe_from_internal_channel
if @_internal_subscriptions.present?
- @_internal_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe(channel, callback) } }
+ @_internal_subscriptions.each { |channel, callback| Concurrent.global_io_executor.post { pubsub.unsubscribe(channel, callback) } }
end
end
diff --git a/actioncable/lib/action_cable/connection/stream.rb b/actioncable/lib/action_cable/connection/stream.rb
new file mode 100644
index 0000000000..ace250cd16
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/stream.rb
@@ -0,0 +1,59 @@
+module ActionCable
+ module Connection
+ #--
+ # This class is heavily based on faye-websocket-ruby
+ #
+ # Copyright (c) 2010-2015 James Coglan
+ class Stream
+ def initialize(event_loop, socket)
+ @event_loop = event_loop
+ @socket_object = socket
+ @stream_send = socket.env['stream.send']
+
+ @rack_hijack_io = nil
+
+ hijack_rack_socket
+ end
+
+ def each(&callback)
+ @stream_send ||= callback
+ end
+
+ def close
+ shutdown
+ @socket_object.client_gone
+ end
+
+ def shutdown
+ clean_rack_hijack
+ end
+
+ def write(data)
+ return @rack_hijack_io.write(data) if @rack_hijack_io
+ return @stream_send.call(data) if @stream_send
+ rescue EOFError
+ @socket_object.client_gone
+ end
+
+ def receive(data)
+ @socket_object.parse(data)
+ end
+
+ private
+ def hijack_rack_socket
+ return unless @socket_object.env['rack.hijack']
+
+ @socket_object.env['rack.hijack'].call
+ @rack_hijack_io = @socket_object.env['rack.hijack_io']
+
+ @event_loop.attach(@rack_hijack_io, self)
+ end
+
+ def clean_rack_hijack
+ return unless @rack_hijack_io
+ @event_loop.detach(@rack_hijack_io, self)
+ @rack_hijack_io = nil
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/connection/stream_event_loop.rb b/actioncable/lib/action_cable/connection/stream_event_loop.rb
new file mode 100644
index 0000000000..e6335082d2
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/stream_event_loop.rb
@@ -0,0 +1,94 @@
+require 'nio'
+require 'thread'
+
+module ActionCable
+ module Connection
+ class StreamEventLoop
+ def initialize
+ @nio = @thread = nil
+ @map = {}
+ @stopping = false
+ @todo = Queue.new
+
+ @spawn_mutex = Mutex.new
+ spawn
+ end
+
+ def attach(io, stream)
+ @todo << lambda do
+ @map[io] = stream
+ @nio.register(io, :r)
+ end
+ wakeup
+ end
+
+ def detach(io, stream)
+ @todo << lambda do
+ @nio.deregister io
+ @map.delete io
+ end
+ wakeup
+ end
+
+ def stop
+ @stopping = true
+ wakeup if @nio
+ end
+
+ private
+ def spawn
+ return if @thread && @thread.status
+
+ @spawn_mutex.synchronize do
+ return if @thread && @thread.status
+
+ @nio ||= NIO::Selector.new
+ @thread = Thread.new { run }
+
+ return true
+ end
+ end
+
+ def wakeup
+ spawn || @nio.wakeup
+ end
+
+ def run
+ loop do
+ if @stopping
+ @nio.close
+ break
+ end
+
+ until @todo.empty?
+ @todo.pop(true).call
+ end
+
+ next unless monitors = @nio.select
+
+ monitors.each do |monitor|
+ io = monitor.io
+ stream = @map[io]
+
+ begin
+ stream.receive io.read_nonblock(4096)
+ rescue IO::WaitReadable
+ next
+ rescue
+ # We expect one of EOFError or Errno::ECONNRESET in
+ # normal operation (when the client goes away). But if
+ # anything else goes wrong, this is still the best way
+ # to handle it.
+ begin
+ stream.close
+ rescue
+ @nio.deregister io
+ @map.delete io
+ end
+ end
+ end
+ end
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/connection/web_socket.rb b/actioncable/lib/action_cable/connection/web_socket.rb
index 670d5690ae..5e89fb9b72 100644
--- a/actioncable/lib/action_cable/connection/web_socket.rb
+++ b/actioncable/lib/action_cable/connection/web_socket.rb
@@ -1,13 +1,11 @@
-require 'faye/websocket'
+require 'websocket/driver'
module ActionCable
module Connection
- # Decorate the Faye::WebSocket with helpers we need.
+ # Wrap the real socket to minimize the externally-presented API
class WebSocket
- delegate :rack_response, :close, :on, to: :websocket
-
- def initialize(env)
- @websocket = Faye::WebSocket.websocket?(env) ? Faye::WebSocket.new(env) : nil
+ def initialize(env, event_target, stream_event_loop)
+ @websocket = ::WebSocket::Driver.websocket?(env) ? ClientSocket.new(env, event_target, stream_event_loop) : nil
end
def possible?
@@ -15,11 +13,19 @@ module ActionCable
end
def alive?
- websocket && websocket.ready_state == Faye::WebSocket::API::OPEN
+ websocket && websocket.alive?
end
def transmit(data)
- websocket.send data
+ websocket.transmit data
+ end
+
+ def close
+ websocket.close
+ end
+
+ def rack_response
+ websocket.rack_response
end
protected
diff --git a/actioncable/lib/action_cable/gem_version.rb b/actioncable/lib/action_cable/gem_version.rb
index b1286aea6f..c652fb91ae 100644
--- a/actioncable/lib/action_cable/gem_version.rb
+++ b/actioncable/lib/action_cable/gem_version.rb
@@ -8,7 +8,7 @@ module ActionCable
MAJOR = 5
MINOR = 0
TINY = 0
- PRE = "beta1"
+ PRE = "beta1.1"
STRING = [MAJOR, MINOR, TINY, PRE].compact.join(".")
end
diff --git a/actioncable/lib/action_cable/process/logging.rb b/actioncable/lib/action_cable/process/logging.rb
deleted file mode 100644
index dce637b3ca..0000000000
--- a/actioncable/lib/action_cable/process/logging.rb
+++ /dev/null
@@ -1,7 +0,0 @@
-require 'action_cable/server'
-require 'eventmachine'
-
-EM.error_handler do |e|
- puts "Error raised inside the event loop: #{e.message}"
- puts e.backtrace.join("\n")
-end
diff --git a/actioncable/lib/action_cable/server.rb b/actioncable/lib/action_cable/server.rb
index a2a89d5f1e..bd6a3826a3 100644
--- a/actioncable/lib/action_cable/server.rb
+++ b/actioncable/lib/action_cable/server.rb
@@ -1,7 +1,3 @@
-require 'eventmachine'
-EventMachine.epoll if EventMachine.epoll?
-EventMachine.kqueue if EventMachine.kqueue?
-
module ActionCable
module Server
extend ActiveSupport::Autoload
diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb
index 3385a4c9f3..fe48c112df 100644
--- a/actioncable/lib/action_cable/server/base.rb
+++ b/actioncable/lib/action_cable/server/base.rb
@@ -1,3 +1,5 @@
+require 'thread'
+
module ActionCable
module Server
# A singleton ActionCable::Server instance is available via ActionCable.server. It's used by the rack process that starts the cable server, but
@@ -13,7 +15,12 @@ module ActionCable
def self.logger; config.logger; end
delegate :logger, to: :config
+ attr_reader :mutex
+
def initialize
+ @mutex = Mutex.new
+
+ @remote_connections = @stream_event_loop = @worker_pool = @channel_classes = @pubsub = nil
end
# Called by rack to setup the server.
@@ -29,25 +36,31 @@ module ActionCable
# Gateway to RemoteConnections. See that class for details.
def remote_connections
- @remote_connections ||= RemoteConnections.new(self)
+ @remote_connections || @mutex.synchronize { @remote_connections ||= RemoteConnections.new(self) }
+ end
+
+ def stream_event_loop
+ @stream_event_loop || @mutex.synchronize { @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new }
end
# The thread worker pool for handling all the connection work on this server. Default size is set by config.worker_pool_size.
def worker_pool
- @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size)
+ @worker_pool || @mutex.synchronize { @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size) }
end
# Requires and returns a hash of all the channel class constants keyed by name.
def channel_classes
- @channel_classes ||= begin
- config.channel_paths.each { |channel_path| require channel_path }
- config.channel_class_names.each_with_object({}) { |name, hash| hash[name] = name.constantize }
+ @channel_classes || @mutex.synchronize do
+ @channel_classes ||= begin
+ config.channel_paths.each { |channel_path| require channel_path }
+ config.channel_class_names.each_with_object({}) { |name, hash| hash[name] = name.constantize }
+ end
end
end
# Adapter used for all streams/broadcasting.
def pubsub
- @pubsub ||= config.pubsub_adapter.new(self)
+ @pubsub || @mutex.synchronize { @pubsub ||= config.pubsub_adapter.new(self) }
end
# All the identifiers applied to the connection class associated with this server.
diff --git a/actioncable/lib/action_cable/server/configuration.rb b/actioncable/lib/action_cable/server/configuration.rb
index ebbf60c6e2..9a248933c4 100644
--- a/actioncable/lib/action_cable/server/configuration.rb
+++ b/actioncable/lib/action_cable/server/configuration.rb
@@ -5,7 +5,7 @@ module ActionCable
class Configuration
attr_accessor :logger, :log_tags
attr_accessor :connection_class, :worker_pool_size
- attr_accessor :channels_path
+ attr_accessor :channel_load_paths
attr_accessor :disable_request_forgery_protection, :allowed_request_origins
attr_accessor :cable, :url
@@ -15,13 +15,15 @@ module ActionCable
@connection_class = ApplicationCable::Connection
@worker_pool_size = 100
- @channels_path = Rails.root.join('app/channels')
+ @channel_load_paths = [Rails.root.join('app/channels')]
@disable_request_forgery_protection = false
end
def channel_paths
- @channels ||= Dir["#{channels_path}/**/*_channel.rb"]
+ @channel_paths ||= channel_load_paths.flat_map do |path|
+ Dir["#{path}/**/*_channel.rb"]
+ end
end
def channel_class_names
diff --git a/actioncable/lib/action_cable/server/connections.rb b/actioncable/lib/action_cable/server/connections.rb
index 47dcea8c20..8671dd5ebd 100644
--- a/actioncable/lib/action_cable/server/connections.rb
+++ b/actioncable/lib/action_cable/server/connections.rb
@@ -22,11 +22,9 @@ module ActionCable
# then can't rely on being able to receive and send to it. So there's a 3 second heartbeat running on all connections. If the beat fails, we automatically
# disconnect.
def setup_heartbeat_timer
- EM.next_tick do
- @heartbeat_timer ||= EventMachine.add_periodic_timer(BEAT_INTERVAL) do
- EM.next_tick { connections.map(&:beat) }
- end
- end
+ @heartbeat_timer ||= Concurrent::TimerTask.new(execution_interval: BEAT_INTERVAL) do
+ Concurrent.global_io_executor.post { connections.map(&:beat) }
+ end.tap(&:execute)
end
def open_connections_statistics
diff --git a/actioncable/lib/action_cable/subscription_adapter.rb b/actioncable/lib/action_cable/subscription_adapter.rb
index e770f4fb00..72e62f3daf 100644
--- a/actioncable/lib/action_cable/subscription_adapter.rb
+++ b/actioncable/lib/action_cable/subscription_adapter.rb
@@ -1,5 +1,8 @@
module ActionCable
module SubscriptionAdapter
- autoload :Base, 'action_cable/subscription_adapter/base'
+ extend ActiveSupport::Autoload
+
+ autoload :Base
+ autoload :SubscriberMap
end
end
diff --git a/actioncable/lib/action_cable/subscription_adapter/async.rb b/actioncable/lib/action_cable/subscription_adapter/async.rb
new file mode 100644
index 0000000000..cca6894289
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter/async.rb
@@ -0,0 +1,22 @@
+require 'action_cable/subscription_adapter/inline'
+
+module ActionCable
+ module SubscriptionAdapter
+ class Async < Inline # :nodoc:
+ private
+ def new_subscriber_map
+ AsyncSubscriberMap.new
+ end
+
+ class AsyncSubscriberMap < SubscriberMap
+ def add_subscriber(*)
+ Concurrent.global_io_executor.post { super }
+ end
+
+ def invoke_callback(*)
+ Concurrent.global_io_executor.post { super }
+ end
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/subscription_adapter/base.rb b/actioncable/lib/action_cable/subscription_adapter/base.rb
index 11910803e8..796db5ffa3 100644
--- a/actioncable/lib/action_cable/subscription_adapter/base.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/base.rb
@@ -19,6 +19,10 @@ module ActionCable
def unsubscribe(channel, message_callback)
raise NotImplementedError
end
+
+ def shutdown
+ raise NotImplementedError
+ end
end
end
end
diff --git a/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb
new file mode 100644
index 0000000000..d697548cbd
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb
@@ -0,0 +1,67 @@
+require 'thread'
+
+gem 'em-hiredis', '~> 0.3.0'
+gem 'redis', '~> 3.0'
+require 'em-hiredis'
+require 'redis'
+
+EventMachine.epoll if EventMachine.epoll?
+EventMachine.kqueue if EventMachine.kqueue?
+
+module ActionCable
+ module SubscriptionAdapter
+ class EventedRedis < Base # :nodoc:
+ @@mutex = Mutex.new
+
+ def initialize(*)
+ super
+ @redis_connection_for_broadcasts = @redis_connection_for_subscriptions = nil
+ end
+
+ def broadcast(channel, payload)
+ redis_connection_for_broadcasts.publish(channel, payload)
+ end
+
+ def subscribe(channel, message_callback, success_callback = nil)
+ redis_connection_for_subscriptions.pubsub.subscribe(channel, &message_callback).tap do |result|
+ result.callback { |reply| success_callback.call } if success_callback
+ end
+ end
+
+ def unsubscribe(channel, message_callback)
+ redis_connection_for_subscriptions.pubsub.unsubscribe_proc(channel, message_callback)
+ end
+
+ def shutdown
+ redis_connection_for_subscriptions.pubsub.close_connection
+ @redis_connection_for_subscriptions = nil
+ end
+
+ private
+ def redis_connection_for_subscriptions
+ ensure_reactor_running
+ @redis_connection_for_subscriptions || @server.mutex.synchronize do
+ @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis|
+ redis.on(:reconnect_failed) do
+ @logger.info "[ActionCable] Redis reconnect failed."
+ end
+ end
+ end
+ end
+
+ def redis_connection_for_broadcasts
+ @redis_connection_for_broadcasts || @server.mutex.synchronize do
+ @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable)
+ end
+ end
+
+ def ensure_reactor_running
+ return if EventMachine.reactor_running?
+ @@mutex.synchronize do
+ Thread.new { EventMachine.run } unless EventMachine.reactor_running?
+ Thread.pass until EventMachine.reactor_running?
+ end
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/subscription_adapter/inline.rb b/actioncable/lib/action_cable/subscription_adapter/inline.rb
new file mode 100644
index 0000000000..81357faead
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter/inline.rb
@@ -0,0 +1,35 @@
+module ActionCable
+ module SubscriptionAdapter
+ class Inline < Base # :nodoc:
+ def initialize(*)
+ super
+ @subscriber_map = nil
+ end
+
+ def broadcast(channel, payload)
+ subscriber_map.broadcast(channel, payload)
+ end
+
+ def subscribe(channel, callback, success_callback = nil)
+ subscriber_map.add_subscriber(channel, callback, success_callback)
+ end
+
+ def unsubscribe(channel, callback)
+ subscriber_map.remove_subscriber(channel, callback)
+ end
+
+ def shutdown
+ # nothing to do
+ end
+
+ private
+ def subscriber_map
+ @subscriber_map || @server.mutex.synchronize { @subscriber_map ||= new_subscriber_map }
+ end
+
+ def new_subscriber_map
+ SubscriberMap.new
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
index 6465663c97..abaeb92e54 100644
--- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
@@ -5,6 +5,11 @@ require 'thread'
module ActionCable
module SubscriptionAdapter
class PostgreSQL < Base # :nodoc:
+ def initialize(*)
+ super
+ @listener = nil
+ end
+
def broadcast(channel, payload)
with_connection do |pg_conn|
pg_conn.exec("NOTIFY #{pg_conn.escape_identifier(channel)}, '#{pg_conn.escape_string(payload)}'")
@@ -12,11 +17,15 @@ module ActionCable
end
def subscribe(channel, callback, success_callback = nil)
- listener.subscribe_to(channel, callback, success_callback)
+ listener.add_subscriber(channel, callback, success_callback)
end
def unsubscribe(channel, callback)
- listener.unsubscribe_from(channel, callback)
+ listener.remove_subscriber(channel, callback)
+ end
+
+ def shutdown
+ listener.shutdown
end
def with_connection(&block) # :nodoc:
@@ -33,17 +42,17 @@ module ActionCable
private
def listener
- @listener ||= Listener.new(self)
+ @listener || @server.mutex.synchronize { @listener ||= Listener.new(self) }
end
- class Listener
+ class Listener < SubscriberMap
def initialize(adapter)
+ super()
+
@adapter = adapter
- @subscribers = Hash.new { |h,k| h[k] = [] }
- @sync = Mutex.new
@queue = Queue.new
- Thread.new do
+ @thread = Thread.new do
Thread.current.abort_on_exception = true
listen
end
@@ -51,46 +60,45 @@ module ActionCable
def listen
@adapter.with_connection do |pg_conn|
- loop do
- until @queue.empty?
- action, channel, callback = @queue.pop(true)
- escaped_channel = pg_conn.escape_identifier(channel)
-
- if action == :listen
- pg_conn.exec("LISTEN #{escaped_channel}")
- ::EM.next_tick(&callback) if callback
- elsif action == :unlisten
- pg_conn.exec("UNLISTEN #{escaped_channel}")
+ catch :shutdown do
+ loop do
+ until @queue.empty?
+ action, channel, callback = @queue.pop(true)
+
+ case action
+ when :listen
+ pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}")
+ Concurrent.global_io_executor << callback if callback
+ when :unlisten
+ pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}")
+ when :shutdown
+ throw :shutdown
+ end
end
- end
- pg_conn.wait_for_notify(1) do |chan, pid, message|
- @subscribers[chan].each do |callback|
- ::EM.next_tick { callback.call(message) }
+ pg_conn.wait_for_notify(1) do |chan, pid, message|
+ broadcast(chan, message)
end
end
end
end
end
- def subscribe_to(channel, callback, success_callback)
- @sync.synchronize do
- if @subscribers[channel].empty?
- @queue.push([:listen, channel, success_callback])
- end
+ def shutdown
+ @queue.push([:shutdown])
+ Thread.pass while @thread.alive?
+ end
- @subscribers[channel] << callback
- end
+ def add_channel(channel, on_success)
+ @queue.push([:listen, channel, on_success])
end
- def unsubscribe_from(channel, callback)
- @sync.synchronize do
- @subscribers[channel].delete(callback)
+ def remove_channel(channel)
+ @queue.push([:unlisten, channel])
+ end
- if @subscribers[channel].empty?
- @queue.push([:unlisten, channel])
- end
- end
+ def invoke_callback(*)
+ Concurrent.global_io_executor.post { super }
end
end
end
diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb
index d149f28b1f..7076383efe 100644
--- a/actioncable/lib/action_cable/subscription_adapter/redis.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb
@@ -1,36 +1,162 @@
-gem 'em-hiredis', '~> 0.3.0'
+require 'thread'
+
gem 'redis', '~> 3.0'
-require 'em-hiredis'
require 'redis'
module ActionCable
module SubscriptionAdapter
class Redis < Base # :nodoc:
+ def initialize(*)
+ super
+ @listener = nil
+ @redis_connection_for_broadcasts = nil
+ end
+
def broadcast(channel, payload)
redis_connection_for_broadcasts.publish(channel, payload)
end
- def subscribe(channel, message_callback, success_callback = nil)
- redis_connection_for_subscriptions.pubsub.subscribe(channel, &message_callback).tap do |result|
- result.callback(&success_callback) if success_callback
- end
+ def subscribe(channel, callback, success_callback = nil)
+ listener.add_subscriber(channel, callback, success_callback)
end
- def unsubscribe(channel, message_callback)
- hi_redis_conn.pubsub.unsubscribe_proc(channel, message_callback)
+ def unsubscribe(channel, callback)
+ listener.remove_subscriber(channel, callback)
+ end
+
+ def shutdown
+ @listener.shutdown if @listener
+ end
+
+ def redis_connection_for_subscriptions
+ ::Redis.new(@server.config.cable)
end
private
- def redis_connection_for_subscriptions
- @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis|
- redis.on(:reconnect_failed) do
- @logger.info "[ActionCable] Redis reconnect failed."
- end
- end
+ def listener
+ @listener || @server.mutex.synchronize { @listener ||= Listener.new(self) }
end
def redis_connection_for_broadcasts
- @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable)
+ @redis_connection_for_broadcasts || @server.mutex.synchronize do
+ @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable)
+ end
+ end
+
+ class Listener < SubscriberMap
+ def initialize(adapter)
+ super()
+
+ @adapter = adapter
+
+ @subscribe_callbacks = Hash.new { |h, k| h[k] = [] }
+ @subscription_lock = Mutex.new
+
+ @raw_client = nil
+
+ @when_connected = []
+
+ @thread = nil
+ end
+
+ def listen(conn)
+ conn.without_reconnect do
+ original_client = conn.client
+
+ conn.subscribe('_action_cable_internal') do |on|
+ on.subscribe do |chan, count|
+ @subscription_lock.synchronize do
+ if count == 1
+ @raw_client = original_client
+
+ until @when_connected.empty?
+ @when_connected.shift.call
+ end
+ end
+
+ if callbacks = @subscribe_callbacks[chan]
+ next_callback = callbacks.shift
+ Concurrent.global_io_executor << next_callback if next_callback
+ @subscribe_callbacks.delete(chan) if callbacks.empty?
+ end
+ end
+ end
+
+ on.message do |chan, message|
+ broadcast(chan, message)
+ end
+
+ on.unsubscribe do |chan, count|
+ if count == 0
+ @subscription_lock.synchronize do
+ @raw_client = nil
+ end
+ end
+ end
+ end
+ end
+ end
+
+ def shutdown
+ @subscription_lock.synchronize do
+ return if @thread.nil?
+
+ when_connected do
+ send_command('unsubscribe')
+ @raw_client = nil
+ end
+ end
+
+ Thread.pass while @thread.alive?
+ end
+
+ def add_channel(channel, on_success)
+ @subscription_lock.synchronize do
+ ensure_listener_running
+ @subscribe_callbacks[channel] << on_success
+ when_connected { send_command('subscribe', channel) }
+ end
+ end
+
+ def remove_channel(channel)
+ @subscription_lock.synchronize do
+ when_connected { send_command('unsubscribe', channel) }
+ end
+ end
+
+ def invoke_callback(*)
+ Concurrent.global_io_executor.post { super }
+ end
+
+ private
+ def ensure_listener_running
+ @thread ||= Thread.new do
+ Thread.current.abort_on_exception = true
+
+ conn = @adapter.redis_connection_for_subscriptions
+ listen conn
+ end
+ end
+
+ def when_connected(&block)
+ if @raw_client
+ block.call
+ else
+ @when_connected << block
+ end
+ end
+
+ def send_command(*command)
+ @raw_client.write(command)
+
+ very_raw_connection =
+ @raw_client.connection.instance_variable_defined?(:@connection) &&
+ @raw_client.connection.instance_variable_get(:@connection)
+
+ if very_raw_connection && very_raw_connection.respond_to?(:flush)
+ very_raw_connection.flush
+ end
+ end
end
end
end
diff --git a/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb b/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb
new file mode 100644
index 0000000000..37eed09793
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb
@@ -0,0 +1,53 @@
+module ActionCable
+ module SubscriptionAdapter
+ class SubscriberMap
+ def initialize
+ @subscribers = Hash.new { |h,k| h[k] = [] }
+ @sync = Mutex.new
+ end
+
+ def add_subscriber(channel, subscriber, on_success)
+ @sync.synchronize do
+ new_channel = !@subscribers.key?(channel)
+
+ @subscribers[channel] << subscriber
+
+ if new_channel
+ add_channel channel, on_success
+ elsif on_success
+ on_success.call
+ end
+ end
+ end
+
+ def remove_subscriber(channel, subscriber)
+ @sync.synchronize do
+ @subscribers[channel].delete(subscriber)
+
+ if @subscribers[channel].empty?
+ @subscribers.delete channel
+ remove_channel channel
+ end
+ end
+ end
+
+ def broadcast(channel, message)
+ list = @sync.synchronize { @subscribers[channel].dup }
+ list.each do |subscriber|
+ invoke_callback(subscriber, message)
+ end
+ end
+
+ def add_channel(channel, on_success)
+ on_success.call if on_success
+ end
+
+ def remove_channel(channel)
+ end
+
+ def invoke_callback(callback, message)
+ callback.call message
+ end
+ end
+ end
+end
diff --git a/actioncable/test/channel/periodic_timers_test.rb b/actioncable/test/channel/periodic_timers_test.rb
index 1590a12f09..64f0247cd6 100644
--- a/actioncable/test/channel/periodic_timers_test.rb
+++ b/actioncable/test/channel/periodic_timers_test.rb
@@ -31,7 +31,7 @@ class ActionCable::Channel::PeriodicTimersTest < ActiveSupport::TestCase
end
test "timer start and stop" do
- EventMachine::PeriodicTimer.expects(:new).times(2).returns(true)
+ Concurrent::TimerTask.expects(:new).times(2).returns(true)
channel = ChatChannel.new @connection, "{id: 1}", { id: 1 }
channel.expects(:stop_periodic_timers).once
diff --git a/actioncable/test/channel/stream_test.rb b/actioncable/test/channel/stream_test.rb
index 3fa2b291b7..947efd96d4 100644
--- a/actioncable/test/channel/stream_test.rb
+++ b/actioncable/test/channel/stream_test.rb
@@ -31,9 +31,7 @@ class ActionCable::Channel::StreamTest < ActionCable::TestCase
test "stream_for" do
run_in_eventmachine do
connection = TestConnection.new
- EM.next_tick do
- connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) }
- end
+ connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) }
channel = ChatChannel.new connection, ""
channel.stream_for Room.new(1)
@@ -41,39 +39,35 @@ class ActionCable::Channel::StreamTest < ActionCable::TestCase
end
test "stream_from subscription confirmation" do
- EM.run do
+ run_in_eventmachine do
connection = TestConnection.new
ChatChannel.new connection, "{id: 1}", { id: 1 }
assert_nil connection.last_transmission
- EM::Timer.new(0.1) do
- expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "confirm_subscription"
- connection.transmit(expected)
+ wait_for_async
- assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation within 0.1s"
+ expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "confirm_subscription"
+ connection.transmit(expected)
- EM.run_deferred_callbacks
- EM.stop
- end
+ assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation within 0.1s"
end
end
test "subscription confirmation should only be sent out once" do
- EM.run do
+ run_in_eventmachine do
connection = TestConnection.new
channel = ChatChannel.new connection, "test_channel"
channel.send_confirmation
channel.send_confirmation
- EM.run_deferred_callbacks
+ wait_for_async
expected = ActiveSupport::JSON.encode "identifier" => "test_channel", "type" => "confirm_subscription"
assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation"
assert_equal 1, connection.transmissions.size
- EM.stop
end
end
diff --git a/actioncable/test/client/echo_channel.rb b/actioncable/test/client/echo_channel.rb
new file mode 100644
index 0000000000..63e35f194a
--- /dev/null
+++ b/actioncable/test/client/echo_channel.rb
@@ -0,0 +1,18 @@
+class EchoChannel < ActionCable::Channel::Base
+ def subscribed
+ stream_from "global"
+ end
+
+ def ding(data)
+ transmit(dong: data['message'])
+ end
+
+ def delay(data)
+ sleep 1
+ transmit(dong: data['message'])
+ end
+
+ def bulk(data)
+ ActionCable.server.broadcast "global", wide: data['message']
+ end
+end
diff --git a/actioncable/test/client_test.rb b/actioncable/test/client_test.rb
new file mode 100644
index 0000000000..199d2b90a3
--- /dev/null
+++ b/actioncable/test/client_test.rb
@@ -0,0 +1,219 @@
+require 'test_helper'
+require 'concurrent'
+
+require 'active_support/core_ext/hash/indifferent_access'
+require 'pathname'
+
+require 'faye/websocket'
+require 'json'
+
+class ClientTest < ActionCable::TestCase
+ WAIT_WHEN_EXPECTING_EVENT = 3
+ WAIT_WHEN_NOT_EXPECTING_EVENT = 0.2
+
+ def setup
+ # TODO: ActionCable requires a *lot* of setup at the moment...
+ ::Object.const_set(:ApplicationCable, Module.new)
+ ::ApplicationCable.const_set(:Connection, Class.new(ActionCable::Connection::Base))
+
+ ::Object.const_set(:Rails, Module.new)
+ ::Rails.singleton_class.send(:define_method, :root) { Pathname.new(__dir__) }
+
+ ActionCable.instance_variable_set(:@server, nil)
+ server = ActionCable.server
+ server.config = ActionCable::Server::Configuration.new
+ inner_logger = Logger.new(StringIO.new).tap { |l| l.level = Logger::UNKNOWN }
+ server.config.logger = ActionCable::Connection::TaggedLoggerProxy.new(inner_logger, tags: [])
+
+ server.config.cable = { adapter: 'async' }.with_indifferent_access
+
+ # and now the "real" setup for our test:
+ server.config.disable_request_forgery_protection = true
+ server.config.channel_load_paths = [File.expand_path('client', __dir__)]
+
+ Thread.new { EventMachine.run } unless EventMachine.reactor_running?
+ Thread.pass until EventMachine.reactor_running?
+
+ # faye-websocket is warning-rich
+ @previous_verbose, $VERBOSE = $VERBOSE, nil
+ end
+
+ def teardown
+ $VERBOSE = @previous_verbose
+
+ begin
+ ::Object.send(:remove_const, :ApplicationCable)
+ rescue NameError
+ end
+ begin
+ ::Object.send(:remove_const, :Rails)
+ rescue NameError
+ end
+ end
+
+ def with_puma_server(rack_app = ActionCable.server, port = 3099)
+ server = ::Puma::Server.new(rack_app, ::Puma::Events.strings)
+ server.add_tcp_listener '127.0.0.1', port
+ server.min_threads = 1
+ server.max_threads = 4
+
+ t = Thread.new { server.run.join }
+ yield port
+
+ ensure
+ server.stop(true) if server
+ t.join if t
+ end
+
+ class SyncClient
+ attr_reader :pings
+
+ def initialize(port)
+ @ws = Faye::WebSocket::Client.new("ws://127.0.0.1:#{port}/")
+ @messages = Queue.new
+ @closed = Concurrent::Event.new
+ @has_messages = Concurrent::Event.new
+ @pings = 0
+
+ open = Concurrent::Event.new
+ error = nil
+
+ @ws.on(:error) do |event|
+ if open.set?
+ @messages << RuntimeError.new(event.message)
+ else
+ error = event.message
+ open.set
+ end
+ end
+
+ @ws.on(:open) do |event|
+ open.set
+ end
+
+ @ws.on(:message) do |event|
+ hash = JSON.parse(event.data)
+ if hash['identifier'] == '_ping'
+ @pings += 1
+ else
+ @messages << hash
+ @has_messages.set
+ end
+ end
+
+ @ws.on(:close) do |event|
+ @closed.set
+ end
+
+ open.wait(WAIT_WHEN_EXPECTING_EVENT)
+ raise error if error
+ end
+
+ def read_message
+ @has_messages.wait(WAIT_WHEN_EXPECTING_EVENT) if @messages.empty?
+ @has_messages.reset if @messages.size < 2
+
+ msg = @messages.pop(true)
+ raise msg if msg.is_a?(Exception)
+
+ msg
+ end
+
+ def read_messages(expected_size = 0)
+ list = []
+ loop do
+ @has_messages.wait(list.size < expected_size ? WAIT_WHEN_EXPECTING_EVENT : WAIT_WHEN_NOT_EXPECTING_EVENT)
+ if @has_messages.set?
+ list << read_message
+ else
+ break
+ end
+ end
+ list
+ end
+
+ def send_message(hash)
+ @ws.send(JSON.dump(hash))
+ end
+
+ def close
+ sleep WAIT_WHEN_NOT_EXPECTING_EVENT
+
+ unless @messages.empty?
+ raise "#{@messages.size} messages unprocessed"
+ end
+
+ @ws.close
+ @closed.wait(WAIT_WHEN_EXPECTING_EVENT)
+ end
+ end
+
+ def faye_client(port)
+ SyncClient.new(port)
+ end
+
+ def test_single_client
+ with_puma_server do |port|
+ c = faye_client(port)
+ c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
+ assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
+ c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello')
+ assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "message"=>{"dong"=>"hello"}}, c.read_message)
+ c.close
+ end
+ end
+
+ def test_interacting_clients
+ with_puma_server do |port|
+ clients = 10.times.map { faye_client(port) }
+
+ barrier_1 = Concurrent::CyclicBarrier.new(clients.size)
+ barrier_2 = Concurrent::CyclicBarrier.new(clients.size)
+
+ clients.map {|c| Concurrent::Future.execute {
+ c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
+ assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "type"=>"confirm_subscription"}, c.read_message)
+ c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello')
+ assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message)
+ barrier_1.wait WAIT_WHEN_EXPECTING_EVENT
+ c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'bulk', message: 'hello')
+ barrier_2.wait WAIT_WHEN_EXPECTING_EVENT
+ assert_equal clients.size, c.read_messages(clients.size).size
+ } }.each(&:wait!)
+
+ clients.map {|c| Concurrent::Future.execute { c.close } }.each(&:wait!)
+ end
+ end
+
+ def test_many_clients
+ with_puma_server do |port|
+ clients = 200.times.map { faye_client(port) }
+
+ clients.map {|c| Concurrent::Future.execute {
+ c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
+ assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "type"=>"confirm_subscription"}, c.read_message)
+ c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello')
+ assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message)
+ } }.each(&:wait!)
+
+ clients.map {|c| Concurrent::Future.execute { c.close } }.each(&:wait!)
+ end
+ end
+
+ def test_disappearing_client
+ with_puma_server do |port|
+ c = faye_client(port)
+ c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
+ assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
+ c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'delay', message: 'hello')
+ c.close # disappear before write
+
+ c = faye_client(port)
+ c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
+ assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
+ c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello')
+ assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message)
+ c.close # disappear before read
+ end
+ end
+end
diff --git a/actioncable/test/connection/base_test.rb b/actioncable/test/connection/base_test.rb
index 182562db82..e2b017a9a1 100644
--- a/actioncable/test/connection/base_test.rb
+++ b/actioncable/test/connection/base_test.rb
@@ -37,6 +37,8 @@ class ActionCable::Connection::BaseTest < ActionCable::TestCase
connection.process
assert connection.websocket.possible?
+
+ wait_for_async
assert connection.websocket.alive?
end
end
@@ -53,16 +55,15 @@ class ActionCable::Connection::BaseTest < ActionCable::TestCase
test "on connection open" do
run_in_eventmachine do
connection = open_connection
- connection.process
connection.websocket.expects(:transmit).with(regexp_matches(/\_ping/))
connection.message_buffer.expects(:process!)
- # Allow EM to run on_open callback
- EM.next_tick do
- assert_equal [ connection ], @server.connections
- assert connection.connected
- end
+ connection.process
+ wait_for_async
+
+ assert_equal [ connection ], @server.connections
+ assert connection.connected
end
end
@@ -72,12 +73,12 @@ class ActionCable::Connection::BaseTest < ActionCable::TestCase
connection.process
# Setup the connection
- EventMachine.stubs(:add_periodic_timer).returns(true)
- connection.send :on_open
+ Concurrent::TimerTask.stubs(:new).returns(true)
+ connection.send :handle_open
assert connection.connected
connection.subscriptions.expects(:unsubscribe_from_all)
- connection.send :on_close
+ connection.send :handle_close
assert ! connection.connected
assert_equal [], @server.connections
diff --git a/actioncable/test/connection/identifier_test.rb b/actioncable/test/connection/identifier_test.rb
index a110dfdee0..1019ad541e 100644
--- a/actioncable/test/connection/identifier_test.rb
+++ b/actioncable/test/connection/identifier_test.rb
@@ -68,10 +68,10 @@ class ActionCable::Connection::IdentifierTest < ActionCable::TestCase
@connection = Connection.new(server, env)
@connection.process
- @connection.send :on_open
+ @connection.send :handle_open
end
def close_connection
- @connection.send :on_close
+ @connection.send :handle_close
end
end
diff --git a/actioncable/test/connection/multiple_identifiers_test.rb b/actioncable/test/connection/multiple_identifiers_test.rb
index 55a9f96cb3..e9bb4e6d7f 100644
--- a/actioncable/test/connection/multiple_identifiers_test.rb
+++ b/actioncable/test/connection/multiple_identifiers_test.rb
@@ -32,10 +32,10 @@ class ActionCable::Connection::MultipleIdentifiersTest < ActionCable::TestCase
@connection = Connection.new(server, env)
@connection.process
- @connection.send :on_open
+ @connection.send :handle_open
end
def close_connection
- @connection.send :on_close
+ @connection.send :handle_close
end
end
diff --git a/actioncable/test/stubs/test_server.rb b/actioncable/test/stubs/test_server.rb
index 6e6541a952..56d132b30a 100644
--- a/actioncable/test/stubs/test_server.rb
+++ b/actioncable/test/stubs/test_server.rb
@@ -14,6 +14,7 @@ class TestServer
@config.subscription_adapter.new(self)
end
- def send_async
+ def stream_event_loop
+ @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new
end
end
diff --git a/actioncable/test/subscription_adapter/async_test.rb b/actioncable/test/subscription_adapter/async_test.rb
new file mode 100644
index 0000000000..8f413f14c2
--- /dev/null
+++ b/actioncable/test/subscription_adapter/async_test.rb
@@ -0,0 +1,17 @@
+require 'test_helper'
+require_relative './common'
+
+class AsyncAdapterTest < ActionCable::TestCase
+ include CommonSubscriptionAdapterTest
+
+ def setup
+ super
+
+ @tx_adapter.shutdown
+ @tx_adapter = @rx_adapter
+ end
+
+ def cable_config
+ { adapter: 'async' }
+ end
+end
diff --git a/actioncable/test/subscription_adapter/common.rb b/actioncable/test/subscription_adapter/common.rb
new file mode 100644
index 0000000000..361858784e
--- /dev/null
+++ b/actioncable/test/subscription_adapter/common.rb
@@ -0,0 +1,139 @@
+require 'test_helper'
+require 'concurrent'
+
+require 'active_support/core_ext/hash/indifferent_access'
+require 'pathname'
+
+module CommonSubscriptionAdapterTest
+ WAIT_WHEN_EXPECTING_EVENT = 3
+ WAIT_WHEN_NOT_EXPECTING_EVENT = 0.2
+
+ def setup
+ # TODO: ActionCable requires a *lot* of setup at the moment...
+ ::Object.const_set(:ApplicationCable, Module.new)
+ ::ApplicationCable.const_set(:Connection, Class.new(ActionCable::Connection::Base))
+
+ ::Object.const_set(:Rails, Module.new)
+ ::Rails.singleton_class.send(:define_method, :root) { Pathname.new(__dir__) }
+
+ server = ActionCable::Server::Base.new
+ server.config = ActionCable::Server::Configuration.new
+ inner_logger = Logger.new(StringIO.new).tap { |l| l.level = Logger::UNKNOWN }
+ server.config.logger = ActionCable::Connection::TaggedLoggerProxy.new(inner_logger, tags: [])
+
+
+ # and now the "real" setup for our test:
+ server.config.cable = cable_config.with_indifferent_access
+
+ adapter_klass = server.config.pubsub_adapter
+
+ @rx_adapter = adapter_klass.new(server)
+ @tx_adapter = adapter_klass.new(server)
+ end
+
+ def teardown
+ @tx_adapter.shutdown if @tx_adapter && @tx_adapter != @rx_adapter
+ @rx_adapter.shutdown if @rx_adapter
+
+ begin
+ ::Object.send(:remove_const, :ApplicationCable)
+ rescue NameError
+ end
+ begin
+ ::Object.send(:remove_const, :Rails)
+ rescue NameError
+ end
+ end
+
+
+ def subscribe_as_queue(channel, adapter = @rx_adapter)
+ queue = Queue.new
+
+ callback = -> data { queue << data }
+ subscribed = Concurrent::Event.new
+ adapter.subscribe(channel, callback, Proc.new { subscribed.set })
+ subscribed.wait(WAIT_WHEN_EXPECTING_EVENT)
+ assert subscribed.set?
+
+ yield queue
+
+ sleep WAIT_WHEN_NOT_EXPECTING_EVENT
+ assert_empty queue
+ ensure
+ adapter.unsubscribe(channel, callback) if subscribed.set?
+ end
+
+
+ def test_subscribe_and_unsubscribe
+ subscribe_as_queue('channel') do |queue|
+ end
+ end
+
+ def test_basic_broadcast
+ subscribe_as_queue('channel') do |queue|
+ @tx_adapter.broadcast('channel', 'hello world')
+
+ assert_equal 'hello world', queue.pop
+ end
+ end
+
+ def test_broadcast_after_unsubscribe
+ keep_queue = nil
+ subscribe_as_queue('channel') do |queue|
+ keep_queue = queue
+
+ @tx_adapter.broadcast('channel', 'hello world')
+
+ assert_equal 'hello world', queue.pop
+ end
+
+ @tx_adapter.broadcast('channel', 'hello void')
+
+ sleep WAIT_WHEN_NOT_EXPECTING_EVENT
+ assert_empty keep_queue
+ end
+
+ def test_multiple_broadcast
+ subscribe_as_queue('channel') do |queue|
+ @tx_adapter.broadcast('channel', 'bananas')
+ @tx_adapter.broadcast('channel', 'apples')
+
+ received = []
+ 2.times { received << queue.pop }
+ assert_equal ['apples', 'bananas'], received.sort
+ end
+ end
+
+ def test_identical_subscriptions
+ subscribe_as_queue('channel') do |queue|
+ subscribe_as_queue('channel') do |queue_2|
+ @tx_adapter.broadcast('channel', 'hello')
+
+ assert_equal 'hello', queue_2.pop
+ end
+
+ assert_equal 'hello', queue.pop
+ end
+ end
+
+ def test_simultaneous_subscriptions
+ subscribe_as_queue('channel') do |queue|
+ subscribe_as_queue('other channel') do |queue_2|
+ @tx_adapter.broadcast('channel', 'apples')
+ @tx_adapter.broadcast('other channel', 'oranges')
+
+ assert_equal 'apples', queue.pop
+ assert_equal 'oranges', queue_2.pop
+ end
+ end
+ end
+
+ def test_channel_filtered_broadcast
+ subscribe_as_queue('channel') do |queue|
+ @tx_adapter.broadcast('other channel', 'one')
+ @tx_adapter.broadcast('channel', 'two')
+
+ assert_equal 'two', queue.pop
+ end
+ end
+end
diff --git a/actioncable/test/subscription_adapter/evented_redis_test.rb b/actioncable/test/subscription_adapter/evented_redis_test.rb
new file mode 100644
index 0000000000..70333e51bd
--- /dev/null
+++ b/actioncable/test/subscription_adapter/evented_redis_test.rb
@@ -0,0 +1,10 @@
+require 'test_helper'
+require_relative './common'
+
+class EventedRedisAdapterTest < ActionCable::TestCase
+ include CommonSubscriptionAdapterTest
+
+ def cable_config
+ { adapter: 'evented_redis', url: 'redis://127.0.0.1:6379/12' }
+ end
+end
diff --git a/actioncable/test/subscription_adapter/inline_test.rb b/actioncable/test/subscription_adapter/inline_test.rb
new file mode 100644
index 0000000000..75ea51e6b3
--- /dev/null
+++ b/actioncable/test/subscription_adapter/inline_test.rb
@@ -0,0 +1,17 @@
+require 'test_helper'
+require_relative './common'
+
+class InlineAdapterTest < ActionCable::TestCase
+ include CommonSubscriptionAdapterTest
+
+ def setup
+ super
+
+ @tx_adapter.shutdown
+ @tx_adapter = @rx_adapter
+ end
+
+ def cable_config
+ { adapter: 'inline' }
+ end
+end
diff --git a/actioncable/test/subscription_adapter/postgresql_test.rb b/actioncable/test/subscription_adapter/postgresql_test.rb
new file mode 100644
index 0000000000..64c632b0cd
--- /dev/null
+++ b/actioncable/test/subscription_adapter/postgresql_test.rb
@@ -0,0 +1,32 @@
+require 'test_helper'
+require_relative './common'
+
+require 'active_record'
+
+class PostgresqlAdapterTest < ActionCable::TestCase
+ include CommonSubscriptionAdapterTest
+
+ def setup
+ database_config = { 'adapter' => 'postgresql', 'database' => 'activerecord_unittest' }
+ ar_tests = File.expand_path('../../../activerecord/test', __dir__)
+ if Dir.exist?(ar_tests)
+ require File.join(ar_tests, 'config')
+ require File.join(ar_tests, 'support/config')
+ local_config = ARTest.config['arunit']
+ database_config.update local_config if local_config
+ end
+ ActiveRecord::Base.establish_connection database_config
+
+ super
+ end
+
+ def teardown
+ super
+
+ ActiveRecord::Base.clear_all_connections!
+ end
+
+ def cable_config
+ { adapter: 'postgresql' }
+ end
+end
diff --git a/actioncable/test/subscription_adapter/redis_test.rb b/actioncable/test/subscription_adapter/redis_test.rb
new file mode 100644
index 0000000000..4f34dd86c9
--- /dev/null
+++ b/actioncable/test/subscription_adapter/redis_test.rb
@@ -0,0 +1,16 @@
+require 'test_helper'
+require_relative './common'
+
+class RedisAdapterTest < ActionCable::TestCase
+ include CommonSubscriptionAdapterTest
+
+ def cable_config
+ { adapter: 'redis', driver: 'ruby', url: 'redis://127.0.0.1:6379/12' }
+ end
+end
+
+class RedisAdapterTest::Hiredis < RedisAdapterTest
+ def cable_config
+ super.merge(driver: 'hiredis')
+ end
+end
diff --git a/actioncable/test/test_helper.rb b/actioncable/test/test_helper.rb
index 65b45e0c89..8ddbd4e764 100644
--- a/actioncable/test/test_helper.rb
+++ b/actioncable/test/test_helper.rb
@@ -13,24 +13,16 @@ require 'rack/mock'
# Require all the stubs and models
Dir[File.dirname(__FILE__) + '/stubs/*.rb'].each {|file| require file }
-require 'faye/websocket'
-class << Faye::WebSocket
- remove_method :ensure_reactor_running
-
- # We don't want Faye to start the EM reactor in tests because it makes testing much harder.
- # We want to be able to start and stop EM loop in tests to make things simpler.
- def ensure_reactor_running
- # no-op
+class ActionCable::TestCase < ActiveSupport::TestCase
+ def wait_for_async
+ e = Concurrent.global_io_executor
+ until e.completed_task_count == e.scheduled_task_count
+ sleep 0.1
+ end
end
-end
-class ActionCable::TestCase < ActiveSupport::TestCase
def run_in_eventmachine
- EM.run do
- yield
-
- EM.run_deferred_callbacks
- EM.stop
- end
+ yield
+ wait_for_async
end
end
diff --git a/actioncable/test/worker_test.rb b/actioncable/test/worker_test.rb
index 9911a3b98b..4a699cde27 100644
--- a/actioncable/test/worker_test.rb
+++ b/actioncable/test/worker_test.rb
@@ -13,6 +13,11 @@ class WorkerTest < ActiveSupport::TestCase
end
def connection
+ self
+ end
+
+ def logger
+ ActionCable.server.logger
end
end