aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable
diff options
context:
space:
mode:
authorkp <keith.j.payne@gmail.com>2016-02-10 17:44:54 +0000
committerkp <keith.j.payne@gmail.com>2016-02-10 17:44:54 +0000
commitec4ae308a76bd86fb6eaf7fa8ee025af0063ee30 (patch)
tree741019b3fbf474ff4cdeefdbeb6ff1f598ffcdfc /actioncable
parent8641de93eb98d4ebdb0db2530c8c79c0c4e2f95e (diff)
parent688996da7b25080a1a2ef74f5b4789f3e5eb670d (diff)
downloadrails-ec4ae308a76bd86fb6eaf7fa8ee025af0063ee30.tar.gz
rails-ec4ae308a76bd86fb6eaf7fa8ee025af0063ee30.tar.bz2
rails-ec4ae308a76bd86fb6eaf7fa8ee025af0063ee30.zip
Merge remote-tracking branch 'origin/master' into actioncable_logging
Diffstat (limited to 'actioncable')
-rw-r--r--actioncable/.gitignore2
-rw-r--r--actioncable/CHANGELOG.md21
-rw-r--r--actioncable/README.md27
-rw-r--r--actioncable/Rakefile44
-rw-r--r--actioncable/actioncable.gemspec7
-rw-r--r--actioncable/app/assets/javascripts/action_cable.coffee.erb (renamed from actioncable/lib/assets/javascripts/action_cable.coffee.erb)2
-rw-r--r--actioncable/app/assets/javascripts/action_cable/connection.coffee (renamed from actioncable/lib/assets/javascripts/action_cable/connection.coffee)0
-rw-r--r--actioncable/app/assets/javascripts/action_cable/connection_monitor.coffee (renamed from actioncable/lib/assets/javascripts/action_cable/connection_monitor.coffee)0
-rw-r--r--actioncable/app/assets/javascripts/action_cable/consumer.coffee (renamed from actioncable/lib/assets/javascripts/action_cable/consumer.coffee)8
-rw-r--r--actioncable/app/assets/javascripts/action_cable/subscription.coffee (renamed from actioncable/lib/assets/javascripts/action_cable/subscription.coffee)0
-rw-r--r--actioncable/app/assets/javascripts/action_cable/subscriptions.coffee (renamed from actioncable/lib/assets/javascripts/action_cable/subscriptions.coffee)0
-rw-r--r--actioncable/lib/action_cable/channel/base.rb33
-rw-r--r--actioncable/lib/action_cable/channel/streams.rb26
-rw-r--r--actioncable/lib/action_cable/connection/base.rb2
-rw-r--r--actioncable/lib/action_cable/connection/client_socket.rb6
-rw-r--r--actioncable/lib/action_cable/connection/stream_event_loop.rb68
-rw-r--r--actioncable/lib/action_cable/gem_version.rb2
-rw-r--r--actioncable/lib/action_cable/remote_connections.rb12
-rw-r--r--actioncable/lib/action_cable/server/base.rb23
-rw-r--r--actioncable/lib/action_cable/server/broadcasting.rb22
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/async.rb4
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/evented_redis.rb75
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/inline.rb11
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/postgresql.rb7
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/redis.rb163
-rw-r--r--actioncable/lib/rails/generators/channel/templates/channel.rb2
-rw-r--r--actioncable/test/client/echo_channel.rb18
-rw-r--r--actioncable/test/client_test.rb219
-rw-r--r--actioncable/test/subscription_adapter/evented_redis_test.rb10
-rw-r--r--actioncable/test/subscription_adapter/redis_test.rb8
30 files changed, 693 insertions, 129 deletions
diff --git a/actioncable/.gitignore b/actioncable/.gitignore
new file mode 100644
index 0000000000..0a04b29786
--- /dev/null
+++ b/actioncable/.gitignore
@@ -0,0 +1,2 @@
+/lib/assets/compiled
+/tmp
diff --git a/actioncable/CHANGELOG.md b/actioncable/CHANGELOG.md
index a33b7b6b4b..bfc229d795 100644
--- a/actioncable/CHANGELOG.md
+++ b/actioncable/CHANGELOG.md
@@ -1,3 +1,24 @@
+* Added ActionCable::SubscriptionAdapter::EventedRedis.em_redis_connector/redis_connector and
+ ActionCable::SubscriptionAdapter::Redis.redis_connector factory methods for redis connections,
+ so you can overwrite with your own initializers. This is used when you want to use different-than-standard Redis adapters,
+ like for Makara distributed Redis.
+
+ *DHH*
+
+## Rails 5.0.0.beta2 (February 01, 2016) ##
+
+* Support PostgreSQL pubsub adapter.
+
+ *Jon Moss*
+
+* Remove EventMachine dependency.
+
+ *Matthew Draper*
+
+* Remove Celluloid dependency.
+
+ *Mike Perham*
+
* Create notion of an `ActionCable::SubscriptionAdapter`.
Separate out Redis functionality into
`ActionCable::SubscriptionAdapter::Redis`, and add a
diff --git a/actioncable/README.md b/actioncable/README.md
index cad71ddf94..3ed8a23a29 100644
--- a/actioncable/README.md
+++ b/actioncable/README.md
@@ -17,7 +17,7 @@ The client of a WebSocket connection is called the consumer.
Each consumer can in turn subscribe to multiple cable channels. Each channel encapsulates
a logical unit of work, similar to what a controller does in a regular MVC setup. For example,
-you could have a `ChatChannel` and a `AppearancesChannel`, and a consumer could be subscribed to either
+you could have a `ChatChannel` and an `AppearancesChannel`, and a consumer could be subscribed to either
or to both of these channels. At the very least, a consumer should be subscribed to one channel.
When the consumer is subscribed to a channel, they act as a subscriber. The connection between
@@ -188,7 +188,7 @@ can be reached as remote procedure calls via a subscription's `perform` method.
The appearance example was all about exposing server functionality to client-side invocation over the WebSocket connection.
But the great thing about WebSockets is that it's a two-way street. So now let's show an example where the server invokes
-action on the client.
+an action on the client.
This is a web notification channel that allows you to trigger client-side web notifications when you broadcast to the right
streams:
@@ -324,7 +324,7 @@ Rails.application.paths.add "config/cable", with: "somewhere/else/cable.yml"
Action Cable will only accept requests from specified origins, which are passed to the server config as an array. The origins can be instances of strings or regular expressions, against which a check for match will be performed.
```ruby
-ActionCable.server.config.allowed_request_origins = ['http://rubyonrails.com', /http:\/\/ruby.*/]
+Rails.application.config.action_cable.allowed_request_origins = ['http://rubyonrails.com', /http:\/\/ruby.*/]
```
When running in the development environment, this defaults to "http://localhost:3000".
@@ -332,7 +332,7 @@ When running in the development environment, this defaults to "http://localhost:
To disable and allow requests from any origin:
```ruby
-ActionCable.server.config.disable_request_forgery_protection = true
+Rails.application.config.action_cable.disable_request_forgery_protection = true
```
### Consumer Configuration
@@ -374,7 +374,7 @@ App.cable = ActionCable.createConsumer()
The other common option to configure is the log tags applied to the per-connection logger. Here's close to what we're using in Basecamp:
```ruby
-ActionCable.server.config.log_tags = [
+Rails.application.config.action_cable.log_tags = [
-> request { request.env['bc.account_id'] || "no-account" },
:action_cable,
-> request { request.uuid }
@@ -397,8 +397,6 @@ application. The recommended basic setup is as follows:
require ::File.expand_path('../../config/environment', __FILE__)
Rails.application.eager_load!
-require 'action_cable/process/logging'
-
run ActionCable.server
```
@@ -438,7 +436,7 @@ messages back and forth over the WebSocket cable connection. This dependency may
be alleviated in the future, but for the moment that's what it is. So be sure to have
Redis installed and running.
-The Ruby side of things is built on top of [faye-websocket](https://github.com/faye/faye-websocket-ruby) and [concurrent-ruby](https://github.com/ruby-concurrency/concurrent-ruby).
+The Ruby side of things is built on top of [websocket-driver](https://github.com/faye/websocket-driver-ruby), [nio4r](https://github.com/celluloid/nio4r), and [concurrent-ruby](https://github.com/ruby-concurrency/concurrent-ruby).
## Deployment
@@ -450,8 +448,17 @@ as long as you haven’t committed any thread-safety sins.
But this also means that Action Cable needs to run in its own server process.
So you'll have one set of server processes for your normal web work, and another
-set of server processes for the Action Cable. The former can be single-threaded,
-like Unicorn, but the latter must be multi-threaded, like Puma.
+set of server processes for the Action Cable.
+
+The Action Cable server does _not_ need to be a multi-threaded application server.
+This is because Action Cable uses the [Rack socket hijacking API](http://old.blog.phusion.nl/2013/01/23/the-new-rack-socket-hijacking-api/)
+to take over control of connections from the application server. Action Cable
+then manages connections internally, in a multithreaded manner, regardless of
+whether the application server is multi-threaded or not. So Action Cable works
+with all the popular application servers -- Unicorn, Puma and Passenger.
+
+Action Cable does not work with WEBrick, because WEBrick does not support the
+Rack socket hijacking API.
## License
diff --git a/actioncable/Rakefile b/actioncable/Rakefile
index b6c56e9195..1d77fc7067 100644
--- a/actioncable/Rakefile
+++ b/actioncable/Rakefile
@@ -1,9 +1,16 @@
require 'rake/testtask'
+require 'pathname'
+require 'sprockets'
+require 'coffee-script'
+require 'action_cable'
dir = File.dirname(__FILE__)
task :default => :test
+task :package => "assets:compile"
+task "package:clean" => "assets:clean"
+
Rake::TestTask.new do |t|
t.libs << "test"
t.test_files = Dir.glob("#{dir}/test/**/*_test.rb")
@@ -11,3 +18,40 @@ Rake::TestTask.new do |t|
t.verbose = true
t.ruby_opts = ["--dev"] if defined?(JRUBY_VERSION)
end
+
+namespace :assets do
+ root_path = Pathname.new(dir)
+ destination_path = root_path.join("lib/assets/compiled")
+
+ desc "Compile dist/action_cable.js"
+ task :compile do
+ puts 'Compiling Action Cable assets...'
+
+ precompile_list = %w(action_cable.js)
+
+ environment = Sprockets::Environment.new
+ environment.gzip = false
+ Pathname.glob(root_path.join("app/assets/*/")) do |subdir|
+ environment.append_path subdir
+ end
+
+ compile_path = root_path.join("tmp/sprockets")
+ compile_path.rmtree if compile_path.exist?
+ compile_path.mkpath
+
+ manifest = Sprockets::Manifest.new(environment.index, compile_path)
+ manifest.compile(precompile_list)
+
+ destination_path.rmtree if destination_path.exist?
+ manifest.assets.each do |path, fingerprint_path|
+ destination_path.join(path).dirname.mkpath
+ FileUtils.cp(compile_path.join(fingerprint_path), destination_path.join(path))
+ end
+
+ puts 'Done'
+ end
+
+ task :clean do
+ destination_path.rmtree if destination_path.exist?
+ end
+end
diff --git a/actioncable/actioncable.gemspec b/actioncable/actioncable.gemspec
index 14f968f1ef..c65ff7871f 100644
--- a/actioncable/actioncable.gemspec
+++ b/actioncable/actioncable.gemspec
@@ -20,13 +20,6 @@ Gem::Specification.new do |s|
s.add_dependency 'actionpack', version
- s.add_dependency 'coffee-rails', '~> 4.1.0'
s.add_dependency 'nio4r', '~> 1.2'
s.add_dependency 'websocket-driver', '~> 0.6.1'
-
- s.add_development_dependency 'em-hiredis', '~> 0.3.0'
- s.add_development_dependency 'mocha'
- s.add_development_dependency 'pg'
- s.add_development_dependency 'puma'
- s.add_development_dependency 'redis', '~> 3.0'
end
diff --git a/actioncable/lib/assets/javascripts/action_cable.coffee.erb b/actioncable/app/assets/javascripts/action_cable.coffee.erb
index 7daea4ebcd..18a48c0610 100644
--- a/actioncable/lib/assets/javascripts/action_cable.coffee.erb
+++ b/actioncable/app/assets/javascripts/action_cable.coffee.erb
@@ -1,5 +1,5 @@
#= require_self
-#= require action_cable/consumer
+#= require ./action_cable/consumer
@ActionCable =
INTERNAL: <%= ActionCable::INTERNAL.to_json %>
diff --git a/actioncable/lib/assets/javascripts/action_cable/connection.coffee b/actioncable/app/assets/javascripts/action_cable/connection.coffee
index fbd7dbd35b..fbd7dbd35b 100644
--- a/actioncable/lib/assets/javascripts/action_cable/connection.coffee
+++ b/actioncable/app/assets/javascripts/action_cable/connection.coffee
diff --git a/actioncable/lib/assets/javascripts/action_cable/connection_monitor.coffee b/actioncable/app/assets/javascripts/action_cable/connection_monitor.coffee
index 99b9a1c6d5..99b9a1c6d5 100644
--- a/actioncable/lib/assets/javascripts/action_cable/connection_monitor.coffee
+++ b/actioncable/app/assets/javascripts/action_cable/connection_monitor.coffee
diff --git a/actioncable/lib/assets/javascripts/action_cable/consumer.coffee b/actioncable/app/assets/javascripts/action_cable/consumer.coffee
index fcd8d0fb6c..717c0641a9 100644
--- a/actioncable/lib/assets/javascripts/action_cable/consumer.coffee
+++ b/actioncable/app/assets/javascripts/action_cable/consumer.coffee
@@ -1,7 +1,7 @@
-#= require action_cable/connection
-#= require action_cable/connection_monitor
-#= require action_cable/subscriptions
-#= require action_cable/subscription
+#= require ./connection
+#= require ./connection_monitor
+#= require ./subscriptions
+#= require ./subscription
# The ActionCable.Consumer establishes the connection to a server-side Ruby Connection object. Once established,
# the ActionCable.ConnectionMonitor will ensure that its properly maintained through heartbeats and checking for stale updates.
diff --git a/actioncable/lib/assets/javascripts/action_cable/subscription.coffee b/actioncable/app/assets/javascripts/action_cable/subscription.coffee
index 339d676933..339d676933 100644
--- a/actioncable/lib/assets/javascripts/action_cable/subscription.coffee
+++ b/actioncable/app/assets/javascripts/action_cable/subscription.coffee
diff --git a/actioncable/lib/assets/javascripts/action_cable/subscriptions.coffee b/actioncable/app/assets/javascripts/action_cable/subscriptions.coffee
index ae041ffa2b..ae041ffa2b 100644
--- a/actioncable/lib/assets/javascripts/action_cable/subscriptions.coffee
+++ b/actioncable/app/assets/javascripts/action_cable/subscriptions.coffee
diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb
index 88cdc1cab1..874ebe2e71 100644
--- a/actioncable/lib/action_cable/channel/base.rb
+++ b/actioncable/lib/action_cable/channel/base.rb
@@ -32,8 +32,11 @@ module ActionCable
#
# == Action processing
#
- # Unlike Action Controllers, channels do not follow a REST constraint form for its actions. It's a remote-procedure call model. You can
- # declare any public method on the channel (optionally taking a data argument), and this method is automatically exposed as callable to the client.
+ # Unlike subclasses of ActionController::Base, channels do not follow a REST
+ # constraint form for their actions. Instead, ActionCable operates through a
+ # remote-procedure call model. You can declare any public method on the
+ # channel (optionally taking a <tt>data</tt> argument), and this method is
+ # automatically exposed as callable to the client.
#
# Example:
#
@@ -60,18 +63,22 @@ module ActionCable
# end
# end
#
- # In this example, subscribed/unsubscribed are not callable methods, as they were already declared in ActionCable::Channel::Base, but #appear/away
- # are. #generate_connection_token is also not callable as its a private method. You'll see that appear accepts a data parameter, which it then
- # uses as part of its model call. #away does not, it's simply a trigger action.
+ # In this example, subscribed/unsubscribed are not callable methods, as they
+ # were already declared in ActionCable::Channel::Base, but <tt>#appear</tt>
+ # and <tt>#away</tt> are. <tt>#generate_connection_token</tt> is also not
+ # callable as it's a private method. You'll see that appear accepts a data
+ # parameter, which it then uses as part of its model call. <tt>#away</tt>
+ # does not, since it's simply a trigger action.
#
- # Also note that in this example, current_user is available because it was marked as an identifying attribute on the connection.
- # All such identifiers will automatically create a delegation method of the same name on the channel instance.
+ # Also note that in this example, <tt>current_user</tt> is available because
+ # it was marked as an identifying attribute on the connection. All such
+ # identifiers will automatically create a delegation method of the same name
+ # on the channel instance.
#
# == Rejecting subscription requests
#
- # A channel can reject a subscription request in the #subscribed callback by invoking #reject!
- #
- # Example:
+ # A channel can reject a subscription request in the #subscribed callback by
+ # invoking the #reject method:
#
# class ChatChannel < ApplicationCable::Channel
# def subscribed
@@ -80,8 +87,10 @@ module ActionCable
# end
# end
#
- # In this example, the subscription will be rejected if the current_user does not have access to the chat room.
- # On the client-side, Channel#rejected callback will get invoked when the server rejects the subscription request.
+ # In this example, the subscription will be rejected if the
+ # <tt>current_user</tt> does not have access to the chat room. On the
+ # client-side, the <tt>Channel#rejected</tt> callback will get invoked when
+ # the server rejects the subscription request.
class Base
include Callbacks
include PeriodicTimers
diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb
index a26373e387..3158f30814 100644
--- a/actioncable/lib/action_cable/channel/streams.rb
+++ b/actioncable/lib/action_cable/channel/streams.rb
@@ -41,22 +41,23 @@ module ActionCable
# Example below shows how you can use this to provide performance introspection in the process:
#
# class ChatChannel < ApplicationCable::Channel
- # def subscribed
- # @room = Chat::Room[params[:room_number]]
+ # def subscribed
+ # @room = Chat::Room[params[:room_number]]
#
- # stream_for @room, -> (encoded_message) do
- # message = ActiveSupport::JSON.decode(encoded_message)
+ # stream_for @room, -> (encoded_message) do
+ # message = ActiveSupport::JSON.decode(encoded_message)
#
- # if message['originated_at'].present?
- # elapsed_time = (Time.now.to_f - message['originated_at']).round(2)
+ # if message['originated_at'].present?
+ # elapsed_time = (Time.now.to_f - message['originated_at']).round(2)
#
- # ActiveSupport::Notifications.instrument :performance, measurement: 'Chat.message_delay', value: elapsed_time, action: :timing
- # logger.info "Message took #{elapsed_time}s to arrive"
- # end
+ # ActiveSupport::Notifications.instrument :performance, measurement: 'Chat.message_delay', value: elapsed_time, action: :timing
+ # logger.info "Message took #{elapsed_time}s to arrive"
+ # end
#
- # transmit message
- # end
- # end
+ # transmit message
+ # end
+ # end
+ # end
#
# You can stop streaming from all broadcasts by calling #stop_all_streams.
module Streams
@@ -90,6 +91,7 @@ module ActionCable
stream_from(broadcasting_for([ channel_name, model ]), callback)
end
+ # Unsubscribes all streams associated with this channel from the pubsub queue.
def stop_all_streams
streams.each do |broadcasting, callback|
pubsub.unsubscribe broadcasting, callback
diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb
index de7ff96d7b..e23789978c 100644
--- a/actioncable/lib/action_cable/connection/base.rb
+++ b/actioncable/lib/action_cable/connection/base.rb
@@ -129,7 +129,7 @@ module ActionCable
# ignore
end
- def on_close # :nodoc:
+ def on_close(reason, code) # :nodoc:
send_async :handle_close
end
diff --git a/actioncable/lib/action_cable/connection/client_socket.rb b/actioncable/lib/action_cable/connection/client_socket.rb
index 62dd753646..ef937d7c16 100644
--- a/actioncable/lib/action_cable/connection/client_socket.rb
+++ b/actioncable/lib/action_cable/connection/client_socket.rb
@@ -37,6 +37,7 @@ module ActionCable
@url = ClientSocket.determine_url(@env)
@driver = @driver_started = nil
+ @close_params = ['', 1006]
@ready_state = CONNECTING
@@ -142,10 +143,7 @@ module ActionCable
return if @ready_state == CLOSED
@ready_state = CLOSED
- reason = @close_params ? @close_params[0] : ''
- code = @close_params ? @close_params[1] : 1006
-
- @event_target.on_close(code, reason)
+ @event_target.on_close(*@close_params)
end
end
end
diff --git a/actioncable/lib/action_cable/connection/stream_event_loop.rb b/actioncable/lib/action_cable/connection/stream_event_loop.rb
index f773814973..e6335082d2 100644
--- a/actioncable/lib/action_cable/connection/stream_event_loop.rb
+++ b/actioncable/lib/action_cable/connection/stream_event_loop.rb
@@ -1,18 +1,17 @@
require 'nio'
+require 'thread'
module ActionCable
module Connection
class StreamEventLoop
def initialize
- @nio = NIO::Selector.new
+ @nio = @thread = nil
@map = {}
@stopping = false
@todo = Queue.new
- Thread.new do
- Thread.current.abort_on_exception = true
- run
- end
+ @spawn_mutex = Mutex.new
+ spawn
end
def attach(io, stream)
@@ -20,34 +19,53 @@ module ActionCable
@map[io] = stream
@nio.register(io, :r)
end
- @nio.wakeup
+ wakeup
end
def detach(io, stream)
@todo << lambda do
- @nio.deregister(io)
+ @nio.deregister io
@map.delete io
end
- @nio.wakeup
+ wakeup
end
def stop
@stopping = true
- @nio.wakeup
+ wakeup if @nio
end
- def run
- loop do
- if @stopping
- @nio.close
- break
- end
+ private
+ def spawn
+ return if @thread && @thread.status
+
+ @spawn_mutex.synchronize do
+ return if @thread && @thread.status
+
+ @nio ||= NIO::Selector.new
+ @thread = Thread.new { run }
- until @todo.empty?
- @todo.pop(true).call
+ return true
end
+ end
+
+ def wakeup
+ spawn || @nio.wakeup
+ end
+
+ def run
+ loop do
+ if @stopping
+ @nio.close
+ break
+ end
+
+ until @todo.empty?
+ @todo.pop(true).call
+ end
+
+ next unless monitors = @nio.select
- if monitors = @nio.select
monitors.each do |monitor|
io = monitor.io
stream = @map[io]
@@ -56,13 +74,21 @@ module ActionCable
stream.receive io.read_nonblock(4096)
rescue IO::WaitReadable
next
- rescue EOFError
- stream.close
+ rescue
+ # We expect one of EOFError or Errno::ECONNRESET in
+ # normal operation (when the client goes away). But if
+ # anything else goes wrong, this is still the best way
+ # to handle it.
+ begin
+ stream.close
+ rescue
+ @nio.deregister io
+ @map.delete io
+ end
end
end
end
end
- end
end
end
end
diff --git a/actioncable/lib/action_cable/gem_version.rb b/actioncable/lib/action_cable/gem_version.rb
index c652fb91ae..a71603e61a 100644
--- a/actioncable/lib/action_cable/gem_version.rb
+++ b/actioncable/lib/action_cable/gem_version.rb
@@ -8,7 +8,7 @@ module ActionCable
MAJOR = 5
MINOR = 0
TINY = 0
- PRE = "beta1.1"
+ PRE = "beta2"
STRING = [MAJOR, MINOR, TINY, PRE].compact.join(".")
end
diff --git a/actioncable/lib/action_cable/remote_connections.rb b/actioncable/lib/action_cable/remote_connections.rb
index aa2fc95d2f..7ec121308a 100644
--- a/actioncable/lib/action_cable/remote_connections.rb
+++ b/actioncable/lib/action_cable/remote_connections.rb
@@ -1,6 +1,7 @@
module ActionCable
- # If you need to disconnect a given connection, you go through the RemoteConnections. You find the connections you're looking for by
- # searching the identifier declared on the connection. Example:
+ # If you need to disconnect a given connection, you can go through the
+ # RemoteConnections. You can find the connections you're looking for by
+ # searching for the identifier declared on the connection. For example:
#
# module ApplicationCable
# class Connection < ActionCable::Connection::Base
@@ -11,8 +12,9 @@ module ActionCable
#
# ActionCable.server.remote_connections.where(current_user: User.find(1)).disconnect
#
- # That will disconnect all the connections established for User.find(1) across all servers running on all machines (because it uses
- # the internal channel that all these servers are subscribed to).
+ # This will disconnect all the connections established for
+ # <tt>User.find(1)</tt> across all servers running on all machines, because
+ # it uses the internal channel that all these servers are subscribed to.
class RemoteConnections
attr_reader :server
@@ -25,7 +27,7 @@ module ActionCable
end
private
- # Represents a single remote connection found via ActionCable.server.remote_connections.where(*).
+ # Represents a single remote connection found via <tt>ActionCable.server.remote_connections.where(*)</tt>.
# Exists for the solely for the purpose of calling #disconnect on that connection.
class RemoteConnection
class InvalidIdentifiersError < StandardError; end
diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb
index b00abd208c..fe48c112df 100644
--- a/actioncable/lib/action_cable/server/base.rb
+++ b/actioncable/lib/action_cable/server/base.rb
@@ -1,3 +1,5 @@
+require 'thread'
+
module ActionCable
module Server
# A singleton ActionCable::Server instance is available via ActionCable.server. It's used by the rack process that starts the cable server, but
@@ -13,7 +15,12 @@ module ActionCable
def self.logger; config.logger; end
delegate :logger, to: :config
+ attr_reader :mutex
+
def initialize
+ @mutex = Mutex.new
+
+ @remote_connections = @stream_event_loop = @worker_pool = @channel_classes = @pubsub = nil
end
# Called by rack to setup the server.
@@ -29,29 +36,31 @@ module ActionCable
# Gateway to RemoteConnections. See that class for details.
def remote_connections
- @remote_connections ||= RemoteConnections.new(self)
+ @remote_connections || @mutex.synchronize { @remote_connections ||= RemoteConnections.new(self) }
end
def stream_event_loop
- @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new
+ @stream_event_loop || @mutex.synchronize { @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new }
end
# The thread worker pool for handling all the connection work on this server. Default size is set by config.worker_pool_size.
def worker_pool
- @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size)
+ @worker_pool || @mutex.synchronize { @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size) }
end
# Requires and returns a hash of all the channel class constants keyed by name.
def channel_classes
- @channel_classes ||= begin
- config.channel_paths.each { |channel_path| require channel_path }
- config.channel_class_names.each_with_object({}) { |name, hash| hash[name] = name.constantize }
+ @channel_classes || @mutex.synchronize do
+ @channel_classes ||= begin
+ config.channel_paths.each { |channel_path| require channel_path }
+ config.channel_class_names.each_with_object({}) { |name, hash| hash[name] = name.constantize }
+ end
end
end
# Adapter used for all streams/broadcasting.
def pubsub
- @pubsub ||= config.pubsub_adapter.new(self)
+ @pubsub || @mutex.synchronize { @pubsub ||= config.pubsub_adapter.new(self) }
end
# All the identifiers applied to the connection class associated with this server.
diff --git a/actioncable/lib/action_cable/server/broadcasting.rb b/actioncable/lib/action_cable/server/broadcasting.rb
index 4a26ed9269..7e8aef45f4 100644
--- a/actioncable/lib/action_cable/server/broadcasting.rb
+++ b/actioncable/lib/action_cable/server/broadcasting.rb
@@ -4,19 +4,19 @@ module ActionCable
# broadcastings are streamed directly to the clients subscribed to the named broadcasting. Let's explain with a full-stack example:
#
# class WebNotificationsChannel < ApplicationCable::Channel
- # def subscribed
- # stream_from "web_notifications_#{current_user.id}"
- # end
- # end
+ # def subscribed
+ # stream_from "web_notifications_#{current_user.id}"
+ # end
+ # end
#
- # # Somewhere in your app this is called, perhaps from a NewCommentJob
- # ActionCable.server.broadcast \
- # "web_notifications_1", { title: 'New things!', body: 'All shit fit for print' }
+ # # Somewhere in your app this is called, perhaps from a NewCommentJob
+ # ActionCable.server.broadcast \
+ # "web_notifications_1", { title: "New things!", body: "All that's fit for print" }
#
- # # Client-side coffescript, which assumes you've already requested the right to send web notifications
- # App.cable.subscriptions.create "WebNotificationsChannel",
- # received: (data) ->
- # new Notification data['title'], body: data['body']
+ # # Client-side CoffeeScript, which assumes you've already requested the right to send web notifications
+ # App.cable.subscriptions.create "WebNotificationsChannel",
+ # received: (data) ->
+ # new Notification data['title'], body: data['body']
module Broadcasting
# Broadcast a hash directly to a named <tt>broadcasting</tt>. It'll automatically be JSON encoded.
def broadcast(broadcasting, message)
diff --git a/actioncable/lib/action_cable/subscription_adapter/async.rb b/actioncable/lib/action_cable/subscription_adapter/async.rb
index c88b03947a..cca6894289 100644
--- a/actioncable/lib/action_cable/subscription_adapter/async.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/async.rb
@@ -4,8 +4,8 @@ module ActionCable
module SubscriptionAdapter
class Async < Inline # :nodoc:
private
- def subscriber_map
- @subscriber_map ||= AsyncSubscriberMap.new
+ def new_subscriber_map
+ AsyncSubscriberMap.new
end
class AsyncSubscriberMap < SubscriberMap
diff --git a/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb
new file mode 100644
index 0000000000..af04a58c70
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb
@@ -0,0 +1,75 @@
+require 'thread'
+
+gem 'em-hiredis', '~> 0.3.0'
+gem 'redis', '~> 3.0'
+require 'em-hiredis'
+require 'redis'
+
+EventMachine.epoll if EventMachine.epoll?
+EventMachine.kqueue if EventMachine.kqueue?
+
+module ActionCable
+ module SubscriptionAdapter
+ class EventedRedis < Base # :nodoc:
+ @@mutex = Mutex.new
+
+ # Overwrite this factory method for EventMachine redis connections if you want to use a different Redis library than EM::Hiredis.
+ # This is needed, for example, when using Makara proxies for distributed Redis.
+ cattr_accessor(:em_redis_connector) { ->(config) { EM::Hiredis.connect(config[:url]) } }
+
+ # Overwrite this factory method for redis connections if you want to use a different Redis library than Redis.
+ # This is needed, for example, when using Makara proxies for distributed Redis.
+ cattr_accessor(:redis_connector) { ->(config) { ::Redis.new(url: config[:url]) } }
+
+ def initialize(*)
+ super
+ @redis_connection_for_broadcasts = @redis_connection_for_subscriptions = nil
+ end
+
+ def broadcast(channel, payload)
+ redis_connection_for_broadcasts.publish(channel, payload)
+ end
+
+ def subscribe(channel, message_callback, success_callback = nil)
+ redis_connection_for_subscriptions.pubsub.subscribe(channel, &message_callback).tap do |result|
+ result.callback { |reply| success_callback.call } if success_callback
+ end
+ end
+
+ def unsubscribe(channel, message_callback)
+ redis_connection_for_subscriptions.pubsub.unsubscribe_proc(channel, message_callback)
+ end
+
+ def shutdown
+ redis_connection_for_subscriptions.pubsub.close_connection
+ @redis_connection_for_subscriptions = nil
+ end
+
+ private
+ def redis_connection_for_subscriptions
+ ensure_reactor_running
+ @redis_connection_for_subscriptions || @server.mutex.synchronize do
+ @redis_connection_for_subscriptions ||= self.class.em_redis_connector.call(@server.config.cable).tap do |redis|
+ redis.on(:reconnect_failed) do
+ @logger.info "[ActionCable] Redis reconnect failed."
+ end
+ end
+ end
+ end
+
+ def redis_connection_for_broadcasts
+ @redis_connection_for_broadcasts || @server.mutex.synchronize do
+ @redis_connection_for_broadcasts ||= self.class.redis_connector.call(@server.config.cable)
+ end
+ end
+
+ def ensure_reactor_running
+ return if EventMachine.reactor_running?
+ @@mutex.synchronize do
+ Thread.new { EventMachine.run } unless EventMachine.reactor_running?
+ Thread.pass until EventMachine.reactor_running?
+ end
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/subscription_adapter/inline.rb b/actioncable/lib/action_cable/subscription_adapter/inline.rb
index 4a2a8d23a2..81357faead 100644
--- a/actioncable/lib/action_cable/subscription_adapter/inline.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/inline.rb
@@ -1,6 +1,11 @@
module ActionCable
module SubscriptionAdapter
class Inline < Base # :nodoc:
+ def initialize(*)
+ super
+ @subscriber_map = nil
+ end
+
def broadcast(channel, payload)
subscriber_map.broadcast(channel, payload)
end
@@ -19,7 +24,11 @@ module ActionCable
private
def subscriber_map
- @subscriber_map ||= SubscriberMap.new
+ @subscriber_map || @server.mutex.synchronize { @subscriber_map ||= new_subscriber_map }
+ end
+
+ def new_subscriber_map
+ SubscriberMap.new
end
end
end
diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
index 3ce1bbed68..abaeb92e54 100644
--- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
@@ -5,6 +5,11 @@ require 'thread'
module ActionCable
module SubscriptionAdapter
class PostgreSQL < Base # :nodoc:
+ def initialize(*)
+ super
+ @listener = nil
+ end
+
def broadcast(channel, payload)
with_connection do |pg_conn|
pg_conn.exec("NOTIFY #{pg_conn.escape_identifier(channel)}, '#{pg_conn.escape_string(payload)}'")
@@ -37,7 +42,7 @@ module ActionCable
private
def listener
- @listener ||= Listener.new(self)
+ @listener || @server.mutex.synchronize { @listener ||= Listener.new(self) }
end
class Listener < SubscriberMap
diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb
index a035e3988d..ba4934a264 100644
--- a/actioncable/lib/action_cable/subscription_adapter/redis.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb
@@ -1,57 +1,166 @@
require 'thread'
-gem 'em-hiredis', '~> 0.3.0'
gem 'redis', '~> 3.0'
-require 'em-hiredis'
require 'redis'
-EventMachine.epoll if EventMachine.epoll?
-EventMachine.kqueue if EventMachine.kqueue?
-
module ActionCable
module SubscriptionAdapter
class Redis < Base # :nodoc:
- @@mutex = Mutex.new
+ # Overwrite this factory method for redis connections if you want to use a different Redis library than Redis.
+ # This is needed, for example, when using Makara proxies for distributed Redis.
+ cattr_accessor(:redis_connector) { ->(config) { ::Redis.new(url: config[:url]) } }
+
+ def initialize(*)
+ super
+ @listener = nil
+ @redis_connection_for_broadcasts = nil
+ end
def broadcast(channel, payload)
redis_connection_for_broadcasts.publish(channel, payload)
end
- def subscribe(channel, message_callback, success_callback = nil)
- redis_connection_for_subscriptions.pubsub.subscribe(channel, &message_callback).tap do |result|
- result.callback { |reply| success_callback.call } if success_callback
- end
+ def subscribe(channel, callback, success_callback = nil)
+ listener.add_subscriber(channel, callback, success_callback)
end
- def unsubscribe(channel, message_callback)
- redis_connection_for_subscriptions.pubsub.unsubscribe_proc(channel, message_callback)
+ def unsubscribe(channel, callback)
+ listener.remove_subscriber(channel, callback)
end
def shutdown
- redis_connection_for_subscriptions.pubsub.close_connection
- @redis_connection_for_subscriptions = nil
+ @listener.shutdown if @listener
+ end
+
+ def redis_connection_for_subscriptions
+ ::Redis.new(@server.config.cable)
end
private
- def redis_connection_for_subscriptions
- ensure_reactor_running
- @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis|
- redis.on(:reconnect_failed) do
- @logger.info "[ActionCable] Redis reconnect failed."
- end
- end
+ def listener
+ @listener || @server.mutex.synchronize { @listener ||= Listener.new(self) }
end
def redis_connection_for_broadcasts
- @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable)
+ @redis_connection_for_broadcasts || @server.mutex.synchronize do
+ @redis_connection_for_broadcasts ||= self.class.redis_connector.call(@server.config.cable)
+ end
end
- def ensure_reactor_running
- return if EventMachine.reactor_running?
- @@mutex.synchronize do
- Thread.new { EventMachine.run } unless EventMachine.reactor_running?
- Thread.pass until EventMachine.reactor_running?
+ class Listener < SubscriberMap
+ def initialize(adapter)
+ super()
+
+ @adapter = adapter
+
+ @subscribe_callbacks = Hash.new { |h, k| h[k] = [] }
+ @subscription_lock = Mutex.new
+
+ @raw_client = nil
+
+ @when_connected = []
+
+ @thread = nil
+ end
+
+ def listen(conn)
+ conn.without_reconnect do
+ original_client = conn.client
+
+ conn.subscribe('_action_cable_internal') do |on|
+ on.subscribe do |chan, count|
+ @subscription_lock.synchronize do
+ if count == 1
+ @raw_client = original_client
+
+ until @when_connected.empty?
+ @when_connected.shift.call
+ end
+ end
+
+ if callbacks = @subscribe_callbacks[chan]
+ next_callback = callbacks.shift
+ Concurrent.global_io_executor << next_callback if next_callback
+ @subscribe_callbacks.delete(chan) if callbacks.empty?
+ end
+ end
+ end
+
+ on.message do |chan, message|
+ broadcast(chan, message)
+ end
+
+ on.unsubscribe do |chan, count|
+ if count == 0
+ @subscription_lock.synchronize do
+ @raw_client = nil
+ end
+ end
+ end
+ end
+ end
+ end
+
+ def shutdown
+ @subscription_lock.synchronize do
+ return if @thread.nil?
+
+ when_connected do
+ send_command('unsubscribe')
+ @raw_client = nil
+ end
+ end
+
+ Thread.pass while @thread.alive?
+ end
+
+ def add_channel(channel, on_success)
+ @subscription_lock.synchronize do
+ ensure_listener_running
+ @subscribe_callbacks[channel] << on_success
+ when_connected { send_command('subscribe', channel) }
+ end
+ end
+
+ def remove_channel(channel)
+ @subscription_lock.synchronize do
+ when_connected { send_command('unsubscribe', channel) }
+ end
+ end
+
+ def invoke_callback(*)
+ Concurrent.global_io_executor.post { super }
end
+
+ private
+ def ensure_listener_running
+ @thread ||= Thread.new do
+ Thread.current.abort_on_exception = true
+
+ conn = @adapter.redis_connection_for_subscriptions
+ listen conn
+ end
+ end
+
+ def when_connected(&block)
+ if @raw_client
+ block.call
+ else
+ @when_connected << block
+ end
+ end
+
+ def send_command(*command)
+ @raw_client.write(command)
+
+ very_raw_connection =
+ @raw_client.connection.instance_variable_defined?(:@connection) &&
+ @raw_client.connection.instance_variable_get(:@connection)
+
+ if very_raw_connection && very_raw_connection.respond_to?(:flush)
+ very_raw_connection.flush
+ end
+ end
end
end
end
diff --git a/actioncable/lib/rails/generators/channel/templates/channel.rb b/actioncable/lib/rails/generators/channel/templates/channel.rb
index 6cf04ee61f..7bff3341c1 100644
--- a/actioncable/lib/rails/generators/channel/templates/channel.rb
+++ b/actioncable/lib/rails/generators/channel/templates/channel.rb
@@ -1,4 +1,4 @@
-# Be sure to restart your server when you modify this file. Action Cable runs in an EventMachine loop that does not support auto reloading.
+# Be sure to restart your server when you modify this file. Action Cable runs in a loop that does not support auto reloading.
<% module_namespacing do -%>
class <%= class_name %>Channel < ApplicationCable::Channel
def subscribed
diff --git a/actioncable/test/client/echo_channel.rb b/actioncable/test/client/echo_channel.rb
new file mode 100644
index 0000000000..63e35f194a
--- /dev/null
+++ b/actioncable/test/client/echo_channel.rb
@@ -0,0 +1,18 @@
+class EchoChannel < ActionCable::Channel::Base
+ def subscribed
+ stream_from "global"
+ end
+
+ def ding(data)
+ transmit(dong: data['message'])
+ end
+
+ def delay(data)
+ sleep 1
+ transmit(dong: data['message'])
+ end
+
+ def bulk(data)
+ ActionCable.server.broadcast "global", wide: data['message']
+ end
+end
diff --git a/actioncable/test/client_test.rb b/actioncable/test/client_test.rb
new file mode 100644
index 0000000000..4ade9832e0
--- /dev/null
+++ b/actioncable/test/client_test.rb
@@ -0,0 +1,219 @@
+require 'test_helper'
+require 'concurrent'
+
+require 'active_support/core_ext/hash/indifferent_access'
+require 'pathname'
+
+require 'faye/websocket'
+require 'json'
+
+class ClientTest < ActionCable::TestCase
+ WAIT_WHEN_EXPECTING_EVENT = 3
+ WAIT_WHEN_NOT_EXPECTING_EVENT = 0.2
+
+ def setup
+ # TODO: ActionCable requires a *lot* of setup at the moment...
+ ::Object.const_set(:ApplicationCable, Module.new)
+ ::ApplicationCable.const_set(:Connection, Class.new(ActionCable::Connection::Base))
+
+ ::Object.const_set(:Rails, Module.new)
+ ::Rails.singleton_class.send(:define_method, :root) { Pathname.new(__dir__) }
+
+ ActionCable.instance_variable_set(:@server, nil)
+ server = ActionCable.server
+ server.config = ActionCable::Server::Configuration.new
+ inner_logger = Logger.new(StringIO.new).tap { |l| l.level = Logger::UNKNOWN }
+ server.config.logger = ActionCable::Connection::TaggedLoggerProxy.new(inner_logger, tags: [])
+
+ server.config.cable = { adapter: 'async' }.with_indifferent_access
+
+ # and now the "real" setup for our test:
+ server.config.disable_request_forgery_protection = true
+ server.config.channel_load_paths = [File.expand_path('client', __dir__)]
+
+ Thread.new { EventMachine.run } unless EventMachine.reactor_running?
+ Thread.pass until EventMachine.reactor_running?
+
+ # faye-websocket is warning-rich
+ @previous_verbose, $VERBOSE = $VERBOSE, nil
+ end
+
+ def teardown
+ $VERBOSE = @previous_verbose
+
+ begin
+ ::Object.send(:remove_const, :ApplicationCable)
+ rescue NameError
+ end
+ begin
+ ::Object.send(:remove_const, :Rails)
+ rescue NameError
+ end
+ end
+
+ def with_puma_server(rack_app = ActionCable.server, port = 3099)
+ server = ::Puma::Server.new(rack_app, ::Puma::Events.strings)
+ server.add_tcp_listener '127.0.0.1', port
+ server.min_threads = 1
+ server.max_threads = 4
+
+ t = Thread.new { server.run.join }
+ yield port
+
+ ensure
+ server.stop(true) if server
+ t.join if t
+ end
+
+ class SyncClient
+ attr_reader :pings
+
+ def initialize(port)
+ @ws = Faye::WebSocket::Client.new("ws://127.0.0.1:#{port}/")
+ @messages = Queue.new
+ @closed = Concurrent::Event.new
+ @has_messages = Concurrent::Event.new
+ @pings = 0
+
+ open = Concurrent::Event.new
+ error = nil
+
+ @ws.on(:error) do |event|
+ if open.set?
+ @messages << RuntimeError.new(event.message)
+ else
+ error = event.message
+ open.set
+ end
+ end
+
+ @ws.on(:open) do |event|
+ open.set
+ end
+
+ @ws.on(:message) do |event|
+ hash = JSON.parse(event.data)
+ if hash['identifier'] == '_ping'
+ @pings += 1
+ else
+ @messages << hash
+ @has_messages.set
+ end
+ end
+
+ @ws.on(:close) do |event|
+ @closed.set
+ end
+
+ open.wait(WAIT_WHEN_EXPECTING_EVENT)
+ raise error if error
+ end
+
+ def read_message
+ @has_messages.wait(WAIT_WHEN_EXPECTING_EVENT) if @messages.empty?
+ @has_messages.reset if @messages.size < 2
+
+ msg = @messages.pop(true)
+ raise msg if msg.is_a?(Exception)
+
+ msg
+ end
+
+ def read_messages(expected_size = 0)
+ list = []
+ loop do
+ @has_messages.wait(list.size < expected_size ? WAIT_WHEN_EXPECTING_EVENT : WAIT_WHEN_NOT_EXPECTING_EVENT)
+ if @has_messages.set?
+ list << read_message
+ else
+ break
+ end
+ end
+ list
+ end
+
+ def send_message(hash)
+ @ws.send(JSON.dump(hash))
+ end
+
+ def close
+ sleep WAIT_WHEN_NOT_EXPECTING_EVENT
+
+ unless @messages.empty?
+ raise "#{@messages.size} messages unprocessed"
+ end
+
+ @ws.close
+ @closed.wait(WAIT_WHEN_EXPECTING_EVENT)
+ end
+ end
+
+ def faye_client(port)
+ SyncClient.new(port)
+ end
+
+ def test_single_client
+ with_puma_server do |port|
+ c = faye_client(port)
+ c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
+ assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
+ c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello')
+ assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "message"=>{"dong"=>"hello"}}, c.read_message)
+ c.close
+ end
+ end
+
+ def test_interacting_clients
+ with_puma_server do |port|
+ clients = 10.times.map { faye_client(port) }
+
+ barrier_1 = Concurrent::CyclicBarrier.new(clients.size)
+ barrier_2 = Concurrent::CyclicBarrier.new(clients.size)
+
+ clients.map {|c| Concurrent::Future.execute {
+ c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
+ assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "type"=>"confirm_subscription"}, c.read_message)
+ c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello')
+ assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message)
+ barrier_1.wait WAIT_WHEN_EXPECTING_EVENT
+ c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'bulk', message: 'hello')
+ barrier_2.wait WAIT_WHEN_EXPECTING_EVENT
+ assert_equal clients.size, c.read_messages(clients.size).size
+ } }.each(&:wait!)
+
+ clients.map {|c| Concurrent::Future.execute { c.close } }.each(&:wait!)
+ end
+ end
+
+ def test_many_clients
+ with_puma_server do |port|
+ clients = 100.times.map { faye_client(port) }
+
+ clients.map {|c| Concurrent::Future.execute {
+ c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
+ assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "type"=>"confirm_subscription"}, c.read_message)
+ c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello')
+ assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message)
+ } }.each(&:wait!)
+
+ clients.map {|c| Concurrent::Future.execute { c.close } }.each(&:wait!)
+ end
+ end
+
+ def test_disappearing_client
+ with_puma_server do |port|
+ c = faye_client(port)
+ c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
+ assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
+ c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'delay', message: 'hello')
+ c.close # disappear before write
+
+ c = faye_client(port)
+ c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
+ assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
+ c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello')
+ assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message)
+ c.close # disappear before read
+ end
+ end
+end
diff --git a/actioncable/test/subscription_adapter/evented_redis_test.rb b/actioncable/test/subscription_adapter/evented_redis_test.rb
new file mode 100644
index 0000000000..70333e51bd
--- /dev/null
+++ b/actioncable/test/subscription_adapter/evented_redis_test.rb
@@ -0,0 +1,10 @@
+require 'test_helper'
+require_relative './common'
+
+class EventedRedisAdapterTest < ActionCable::TestCase
+ include CommonSubscriptionAdapterTest
+
+ def cable_config
+ { adapter: 'evented_redis', url: 'redis://127.0.0.1:6379/12' }
+ end
+end
diff --git a/actioncable/test/subscription_adapter/redis_test.rb b/actioncable/test/subscription_adapter/redis_test.rb
index 8d52832c87..4f34dd86c9 100644
--- a/actioncable/test/subscription_adapter/redis_test.rb
+++ b/actioncable/test/subscription_adapter/redis_test.rb
@@ -5,6 +5,12 @@ class RedisAdapterTest < ActionCable::TestCase
include CommonSubscriptionAdapterTest
def cable_config
- { adapter: 'redis', url: 'redis://127.0.0.1:6379/12' }
+ { adapter: 'redis', driver: 'ruby', url: 'redis://127.0.0.1:6379/12' }
+ end
+end
+
+class RedisAdapterTest::Hiredis < RedisAdapterTest
+ def cable_config
+ super.merge(driver: 'hiredis')
end
end