aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable')
-rw-r--r--actioncable/CHANGELOG.md9
-rw-r--r--actioncable/README.md18
-rw-r--r--actioncable/actioncable.gemspec8
-rw-r--r--actioncable/lib/action_cable.rb1
-rw-r--r--actioncable/lib/action_cable/channel/base.rb4
-rw-r--r--actioncable/lib/action_cable/channel/streams.rb6
-rw-r--r--actioncable/lib/action_cable/connection/base.rb2
-rw-r--r--actioncable/lib/action_cable/connection/internal_channel.rb12
-rw-r--r--actioncable/lib/action_cable/engine.rb6
-rw-r--r--actioncable/lib/action_cable/remote_connections.rb2
-rw-r--r--actioncable/lib/action_cable/server/base.rb17
-rw-r--r--actioncable/lib/action_cable/server/broadcasting.rb9
-rw-r--r--actioncable/lib/action_cable/server/configuration.rb24
-rw-r--r--actioncable/lib/action_cable/subscription_adapter.rb5
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/base.rb24
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/postgresql.rb98
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/redis.rb37
-rw-r--r--actioncable/test/channel/stream_test.rb10
-rw-r--r--actioncable/test/connection/identifier_test.rb8
-rw-r--r--actioncable/test/stubs/test_adapter.rb10
-rw-r--r--actioncable/test/stubs/test_connection.rb4
-rw-r--r--actioncable/test/stubs/test_server.rb6
-rw-r--r--actioncable/test/subscription_adapter/base_test.rb73
-rw-r--r--actioncable/test/test_helper.rb1
24 files changed, 329 insertions, 65 deletions
diff --git a/actioncable/CHANGELOG.md b/actioncable/CHANGELOG.md
index 22126d82b7..a33b7b6b4b 100644
--- a/actioncable/CHANGELOG.md
+++ b/actioncable/CHANGELOG.md
@@ -1,3 +1,12 @@
+* Create notion of an `ActionCable::SubscriptionAdapter`.
+ Separate out Redis functionality into
+ `ActionCable::SubscriptionAdapter::Redis`, and add a
+ PostgreSQL adapter as well. Configuration file for
+ ActionCable was changed from`config/redis/cable.yml` to
+ `config/cable.yml`.
+
+ *Jon Moss*
+
## Rails 5.0.0.beta1 (December 18, 2015) ##
* Added to Rails!
diff --git a/actioncable/README.md b/actioncable/README.md
index 636f9935cf..63e328321b 100644
--- a/actioncable/README.md
+++ b/actioncable/README.md
@@ -66,6 +66,13 @@ end
Here `identified_by` is a connection identifier that can be used to find the specific connection again or later.
Note that anything marked as an identifier will automatically create a delegate by the same name on any channel instances created off the connection.
+This relies on the fact that you will already have handled authentication of the user, and
+that a successful authentication sets a signed cookie with the `user_id`. This cookie is then
+automatically sent to the connection instance when a new connection is attempted, and you
+use that to set the `current_user`. By identifying the connection by this same current_user,
+you're also ensuring that you can later retrieve all open connections by a given user (and
+potentially disconnect them all if the user is deleted or deauthorized).
+
Then you should define your `ApplicationCable::Channel` class in Ruby. This is the place where you put
shared logic between your channels.
@@ -77,13 +84,6 @@ module ApplicationCable
end
```
-This relies on the fact that you will already have handled authentication of the user, and
-that a successful authentication sets a signed cookie with the `user_id`. This cookie is then
-automatically sent to the connection instance when a new connection is attempted, and you
-use that to set the `current_user`. By identifying the connection by this same current_user,
-you're also ensuring that you can later retrieve all open connections by a given user (and
-potentially disconnect them all if the user is deleted or deauthorized).
-
The client-side needs to setup a consumer instance of this connection. That's done like so:
```coffeescript
@@ -302,7 +302,7 @@ Action Cable has three required configurations: the Redis connection, allowed re
### Redis
-By default, `ActionCable::Server::Base` will look for a configuration file in `Rails.root.join('config/redis/cable.yml')`.
+By default, `ActionCable::Server::Base` will look for a configuration file in `Rails.root.join('config/cable.yml')`.
This file must specify a Redis url for each Rails environment. It may use the following format:
```yaml
@@ -316,7 +316,7 @@ test: *development
You can also change the location of the Redis config file in a Rails initializer with something like:
```ruby
-Rails.application.paths.add "config/redis/cable", with: "somewhere/else/cable.yml"
+Rails.application.paths.add "config/cable", with: "somewhere/else/cable.yml"
```
### Allowed Request Origins
diff --git a/actioncable/actioncable.gemspec b/actioncable/actioncable.gemspec
index a04fc932aa..a36acc8f6f 100644
--- a/actioncable/actioncable.gemspec
+++ b/actioncable/actioncable.gemspec
@@ -21,11 +21,13 @@ Gem::Specification.new do |s|
s.add_dependency 'actionpack', version
s.add_dependency 'coffee-rails', '~> 4.1.0'
+ s.add_dependency 'eventmachine', '~> 1.0'
s.add_dependency 'faye-websocket', '~> 0.10.0'
s.add_dependency 'websocket-driver', '~> 0.6.1'
- s.add_dependency 'em-hiredis', '~> 0.3.0'
- s.add_dependency 'redis', '~> 3.0'
- s.add_development_dependency 'puma'
+ s.add_development_dependency 'em-hiredis', '~> 0.3.0'
s.add_development_dependency 'mocha'
+ s.add_development_dependency 'pg'
+ s.add_development_dependency 'puma'
+ s.add_development_dependency 'redis', '~> 3.0'
end
diff --git a/actioncable/lib/action_cable.rb b/actioncable/lib/action_cable.rb
index 97f485b32e..1dc66ef3ad 100644
--- a/actioncable/lib/action_cable.rb
+++ b/actioncable/lib/action_cable.rb
@@ -47,4 +47,5 @@ module ActionCable
autoload :Connection
autoload :Channel
autoload :RemoteConnections
+ autoload :SubscriptionAdapter
end
diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb
index ce9d62635c..88cdc1cab1 100644
--- a/actioncable/lib/action_cable/channel/base.rb
+++ b/actioncable/lib/action_cable/channel/base.rb
@@ -133,8 +133,8 @@ module ActionCable
@identifier = identifier
@params = params
- # When a channel is streaming via redis pubsub, we want to delay the confirmation
- # transmission until redis pubsub subscription is confirmed.
+ # When a channel is streaming via pubsub, we want to delay the confirmation
+ # transmission until pubsub subscription is confirmed.
@defer_subscription_confirmation = false
@reject_subscription = nil
diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb
index b5ffa17f72..589946c3db 100644
--- a/actioncable/lib/action_cable/channel/streams.rb
+++ b/actioncable/lib/action_cable/channel/streams.rb
@@ -76,10 +76,10 @@ module ActionCable
streams << [ broadcasting, callback ]
EM.next_tick do
- pubsub.subscribe(broadcasting, &callback).callback do |reply|
+ pubsub.subscribe(broadcasting, callback, lambda do |reply|
transmit_subscription_confirmation
logger.info "#{self.class.name} is streaming from #{broadcasting}"
- end
+ end)
end
end
@@ -92,7 +92,7 @@ module ActionCable
def stop_all_streams
streams.each do |broadcasting, callback|
- pubsub.unsubscribe_proc broadcasting, callback
+ pubsub.unsubscribe broadcasting, callback
logger.info "#{self.class.name} stopped streaming from #{broadcasting}"
end.clear
end
diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb
index a8cfdf90f3..bb8850aaa0 100644
--- a/actioncable/lib/action_cable/connection/base.rb
+++ b/actioncable/lib/action_cable/connection/base.rb
@@ -60,7 +60,7 @@ module ActionCable
@subscriptions = ActionCable::Connection::Subscriptions.new(self)
@message_buffer = ActionCable::Connection::MessageBuffer.new(self)
- @_internal_redis_subscriptions = nil
+ @_internal_subscriptions = nil
@started_at = Time.now
end
diff --git a/actioncable/lib/action_cable/connection/internal_channel.rb b/actioncable/lib/action_cable/connection/internal_channel.rb
index c065a24ab7..54ed7672d2 100644
--- a/actioncable/lib/action_cable/connection/internal_channel.rb
+++ b/actioncable/lib/action_cable/connection/internal_channel.rb
@@ -5,24 +5,24 @@ module ActionCable
extend ActiveSupport::Concern
private
- def internal_redis_channel
+ def internal_channel
"action_cable/#{connection_identifier}"
end
def subscribe_to_internal_channel
if connection_identifier.present?
callback = -> (message) { process_internal_message(message) }
- @_internal_redis_subscriptions ||= []
- @_internal_redis_subscriptions << [ internal_redis_channel, callback ]
+ @_internal_subscriptions ||= []
+ @_internal_subscriptions << [ internal_channel, callback ]
- EM.next_tick { pubsub.subscribe(internal_redis_channel, &callback) }
+ EM.next_tick { pubsub.subscribe(internal_channel, callback) }
logger.info "Registered connection (#{connection_identifier})"
end
end
def unsubscribe_from_internal_channel
- if @_internal_redis_subscriptions.present?
- @_internal_redis_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe_proc(channel, callback) } }
+ if @_internal_subscriptions.present?
+ @_internal_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe(channel, callback) } }
end
end
diff --git a/actioncable/lib/action_cable/engine.rb b/actioncable/lib/action_cable/engine.rb
index 2d3caa5b0a..f5e233e091 100644
--- a/actioncable/lib/action_cable/engine.rb
+++ b/actioncable/lib/action_cable/engine.rb
@@ -24,11 +24,11 @@ module ActionCable
options = app.config.action_cable
options.allowed_request_origins ||= "http://localhost:3000" if ::Rails.env.development?
- app.paths.add "config/redis/cable", with: "config/redis/cable.yml"
+ app.paths.add "config/cable", with: "config/cable.yml"
ActiveSupport.on_load(:action_cable) do
- if (redis_cable_path = Pathname.new(app.config.paths["config/redis/cable"].first)).exist?
- self.redis = Rails.application.config_for(redis_cable_path).with_indifferent_access
+ if (config_path = Pathname.new(app.config.paths["config/cable"].first)).exist?
+ self.cable = Rails.application.config_for(config_path).with_indifferent_access
end
options.each { |k,v| send("#{k}=", v) }
diff --git a/actioncable/lib/action_cable/remote_connections.rb b/actioncable/lib/action_cable/remote_connections.rb
index 1230d905ad..aa2fc95d2f 100644
--- a/actioncable/lib/action_cable/remote_connections.rb
+++ b/actioncable/lib/action_cable/remote_connections.rb
@@ -39,7 +39,7 @@ module ActionCable
# Uses the internal channel to disconnect the connection.
def disconnect
- server.broadcast internal_redis_channel, type: 'disconnect'
+ server.broadcast internal_channel, type: 'disconnect'
end
# Returns all the identifiers that were applied to this connection.
diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb
index 3785bbd154..3385a4c9f3 100644
--- a/actioncable/lib/action_cable/server/base.rb
+++ b/actioncable/lib/action_cable/server/base.rb
@@ -1,5 +1,3 @@
-require 'em-hiredis'
-
module ActionCable
module Server
# A singleton ActionCable::Server instance is available via ActionCable.server. It's used by the rack process that starts the cable server, but
@@ -47,20 +45,9 @@ module ActionCable
end
end
- # The redis pubsub adapter used for all streams/broadcasting.
+ # Adapter used for all streams/broadcasting.
def pubsub
- @pubsub ||= redis.pubsub
- end
-
- # The EventMachine Redis instance used by the pubsub adapter.
- def redis
- @redis ||= EM::Hiredis.connect(config.redis[:url]).tap do |redis|
- redis.on(:reconnect_failed) do
- logger.info "[ActionCable] Redis reconnect failed."
- # logger.info "[ActionCable] Redis reconnected. Closing all the open connections."
- # @connections.map &:close
- end
- end
+ @pubsub ||= config.pubsub_adapter.new(self)
end
# All the identifiers applied to the connection class associated with this server.
diff --git a/actioncable/lib/action_cable/server/broadcasting.rb b/actioncable/lib/action_cable/server/broadcasting.rb
index c759239a0e..4a26ed9269 100644
--- a/actioncable/lib/action_cable/server/broadcasting.rb
+++ b/actioncable/lib/action_cable/server/broadcasting.rb
@@ -1,5 +1,3 @@
-require 'redis'
-
module ActionCable
module Server
# Broadcasting is how other parts of your application can send messages to the channel subscribers. As explained in Channel, most of the time, these
@@ -31,11 +29,6 @@ module ActionCable
Broadcaster.new(self, broadcasting)
end
- # The redis instance used for broadcasting. Not intended for direct user use.
- def broadcasting_redis
- @broadcasting_redis ||= Redis.new(config.redis)
- end
-
private
class Broadcaster
attr_reader :server, :broadcasting
@@ -46,7 +39,7 @@ module ActionCable
def broadcast(message)
server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message}"
- server.broadcasting_redis.publish broadcasting, ActiveSupport::JSON.encode(message)
+ server.pubsub.broadcast broadcasting, ActiveSupport::JSON.encode(message)
end
end
end
diff --git a/actioncable/lib/action_cable/server/configuration.rb b/actioncable/lib/action_cable/server/configuration.rb
index 935133cbba..ebbf60c6e2 100644
--- a/actioncable/lib/action_cable/server/configuration.rb
+++ b/actioncable/lib/action_cable/server/configuration.rb
@@ -5,9 +5,9 @@ module ActionCable
class Configuration
attr_accessor :logger, :log_tags
attr_accessor :connection_class, :worker_pool_size
- attr_accessor :redis, :channels_path
+ attr_accessor :channels_path
attr_accessor :disable_request_forgery_protection, :allowed_request_origins
- attr_accessor :url
+ attr_accessor :cable, :url
def initialize
@log_tags = []
@@ -29,7 +29,25 @@ module ActionCable
Pathname.new(channel_path).basename.to_s.split('.').first.camelize
end
end
+
+ # Returns constant of subscription adapter specified in config/cable.yml.
+ # If the adapter cannot be found, this will default to the Redis adapter.
+ # Also makes sure proper dependencies are required.
+ def pubsub_adapter
+ adapter = (cable.fetch('adapter') { 'redis' })
+ path_to_adapter = "action_cable/subscription_adapter/#{adapter}"
+ begin
+ require path_to_adapter
+ rescue Gem::LoadError => e
+ raise Gem::LoadError, "Specified '#{adapter}' for Action Cable pubsub adapter, but the gem is not loaded. Add `gem '#{e.name}'` to your Gemfile (and ensure its version is at the minimum required by Action Cable)."
+ rescue LoadError => e
+ raise LoadError, "Could not load '#{path_to_adapter}'. Make sure that the adapter in config/cable.yml is valid. If you use an adapter other than 'postgresql' or 'redis' add the necessary adapter gem to the Gemfile.", e.backtrace
+ end
+
+ adapter = adapter.camelize
+ adapter = 'PostgreSQL' if adapter == 'Postgresql'
+ "ActionCable::SubscriptionAdapter::#{adapter}".constantize
+ end
end
end
end
-
diff --git a/actioncable/lib/action_cable/subscription_adapter.rb b/actioncable/lib/action_cable/subscription_adapter.rb
new file mode 100644
index 0000000000..e770f4fb00
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter.rb
@@ -0,0 +1,5 @@
+module ActionCable
+ module SubscriptionAdapter
+ autoload :Base, 'action_cable/subscription_adapter/base'
+ end
+end
diff --git a/actioncable/lib/action_cable/subscription_adapter/base.rb b/actioncable/lib/action_cable/subscription_adapter/base.rb
new file mode 100644
index 0000000000..11910803e8
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter/base.rb
@@ -0,0 +1,24 @@
+module ActionCable
+ module SubscriptionAdapter
+ class Base
+ attr_reader :logger, :server
+
+ def initialize(server)
+ @server = server
+ @logger = @server.logger
+ end
+
+ def broadcast(channel, payload)
+ raise NotImplementedError
+ end
+
+ def subscribe(channel, message_callback, success_callback = nil)
+ raise NotImplementedError
+ end
+
+ def unsubscribe(channel, message_callback)
+ raise NotImplementedError
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
new file mode 100644
index 0000000000..6465663c97
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
@@ -0,0 +1,98 @@
+gem 'pg', '~> 0.18'
+require 'pg'
+require 'thread'
+
+module ActionCable
+ module SubscriptionAdapter
+ class PostgreSQL < Base # :nodoc:
+ def broadcast(channel, payload)
+ with_connection do |pg_conn|
+ pg_conn.exec("NOTIFY #{pg_conn.escape_identifier(channel)}, '#{pg_conn.escape_string(payload)}'")
+ end
+ end
+
+ def subscribe(channel, callback, success_callback = nil)
+ listener.subscribe_to(channel, callback, success_callback)
+ end
+
+ def unsubscribe(channel, callback)
+ listener.unsubscribe_from(channel, callback)
+ end
+
+ def with_connection(&block) # :nodoc:
+ ActiveRecord::Base.connection_pool.with_connection do |ar_conn|
+ pg_conn = ar_conn.raw_connection
+
+ unless pg_conn.is_a?(PG::Connection)
+ raise 'ActiveRecord database must be Postgres in order to use the Postgres ActionCable storage adapter'
+ end
+
+ yield pg_conn
+ end
+ end
+
+ private
+ def listener
+ @listener ||= Listener.new(self)
+ end
+
+ class Listener
+ def initialize(adapter)
+ @adapter = adapter
+ @subscribers = Hash.new { |h,k| h[k] = [] }
+ @sync = Mutex.new
+ @queue = Queue.new
+
+ Thread.new do
+ Thread.current.abort_on_exception = true
+ listen
+ end
+ end
+
+ def listen
+ @adapter.with_connection do |pg_conn|
+ loop do
+ until @queue.empty?
+ action, channel, callback = @queue.pop(true)
+ escaped_channel = pg_conn.escape_identifier(channel)
+
+ if action == :listen
+ pg_conn.exec("LISTEN #{escaped_channel}")
+ ::EM.next_tick(&callback) if callback
+ elsif action == :unlisten
+ pg_conn.exec("UNLISTEN #{escaped_channel}")
+ end
+ end
+
+ pg_conn.wait_for_notify(1) do |chan, pid, message|
+ @subscribers[chan].each do |callback|
+ ::EM.next_tick { callback.call(message) }
+ end
+ end
+ end
+ end
+ end
+
+ def subscribe_to(channel, callback, success_callback)
+ @sync.synchronize do
+ if @subscribers[channel].empty?
+ @queue.push([:listen, channel, success_callback])
+ end
+
+ @subscribers[channel] << callback
+ end
+ end
+
+ def unsubscribe_from(channel, callback)
+ @sync.synchronize do
+ @subscribers[channel].delete(callback)
+
+ if @subscribers[channel].empty?
+ @queue.push([:unlisten, channel])
+ end
+ end
+ end
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb
new file mode 100644
index 0000000000..e42ab2a03f
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb
@@ -0,0 +1,37 @@
+gem 'em-hiredis', '~> 0.3.0'
+gem 'redis', '~> 3.0'
+require 'em-hiredis'
+require 'redis'
+
+module ActionCable
+ module SubscriptionAdapter
+ class Redis < Base # :nodoc:
+ def broadcast(channel, payload)
+ redis_connection_for_broadcasts.publish(channel, payload)
+ end
+
+ def subscribe(channel, message_callback, success_callback = nil)
+ redis_connection_for_subscriptions.pubsub.subscribe(channel, &message_callback).tap do |result|
+ result.callback(&success_callback) if success_callback
+ end
+ end
+
+ def unsubscribe(channel, message_callback)
+ redis_connection_for_subscriptions.pubsub.unsubscribe_proc(channel, message_callback)
+ end
+
+ private
+ def redis_connection_for_subscriptions
+ @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis|
+ redis.on(:reconnect_failed) do
+ @logger.info "[ActionCable] Redis reconnect failed."
+ end
+ end
+ end
+
+ def redis_connection_for_broadcasts
+ @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable)
+ end
+ end
+ end
+end
diff --git a/actioncable/test/channel/stream_test.rb b/actioncable/test/channel/stream_test.rb
index 1424ded04c..3fa2b291b7 100644
--- a/actioncable/test/channel/stream_test.rb
+++ b/actioncable/test/channel/stream_test.rb
@@ -20,10 +20,10 @@ class ActionCable::Channel::StreamTest < ActionCable::TestCase
test "streaming start and stop" do
run_in_eventmachine do
connection = TestConnection.new
- connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("test_room_1").returns stub_everything(:pubsub) }
+ connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("test_room_1", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) }
channel = ChatChannel.new connection, "{id: 1}", { id: 1 }
- connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe_proc) }
+ connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe) }
channel.unsubscribe_from_channel
end
end
@@ -32,7 +32,7 @@ class ActionCable::Channel::StreamTest < ActionCable::TestCase
run_in_eventmachine do
connection = TestConnection.new
EM.next_tick do
- connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire").returns stub_everything(:pubsub) }
+ connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) }
end
channel = ChatChannel.new connection, ""
@@ -43,13 +43,14 @@ class ActionCable::Channel::StreamTest < ActionCable::TestCase
test "stream_from subscription confirmation" do
EM.run do
connection = TestConnection.new
- connection.expects(:pubsub).returns EM::Hiredis.connect.pubsub
ChatChannel.new connection, "{id: 1}", { id: 1 }
assert_nil connection.last_transmission
EM::Timer.new(0.1) do
expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "confirm_subscription"
+ connection.transmit(expected)
+
assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation within 0.1s"
EM.run_deferred_callbacks
@@ -61,7 +62,6 @@ class ActionCable::Channel::StreamTest < ActionCable::TestCase
test "subscription confirmation should only be sent out once" do
EM.run do
connection = TestConnection.new
- connection.stubs(:pubsub).returns EM::Hiredis.connect.pubsub
channel = ChatChannel.new connection, "test_channel"
channel.send_confirmation
diff --git a/actioncable/test/connection/identifier_test.rb b/actioncable/test/connection/identifier_test.rb
index 02e6b21845..a110dfdee0 100644
--- a/actioncable/test/connection/identifier_test.rb
+++ b/actioncable/test/connection/identifier_test.rb
@@ -23,9 +23,9 @@ class ActionCable::Connection::IdentifierTest < ActionCable::TestCase
test "should subscribe to internal channel on open and unsubscribe on close" do
run_in_eventmachine do
- pubsub = mock('pubsub')
- pubsub.expects(:subscribe).with('action_cable/User#lifo')
- pubsub.expects(:unsubscribe_proc).with('action_cable/User#lifo', kind_of(Proc))
+ pubsub = mock('pubsub_adapter')
+ pubsub.expects(:subscribe).with('action_cable/User#lifo', kind_of(Proc))
+ pubsub.expects(:unsubscribe).with('action_cable/User#lifo', kind_of(Proc))
server = TestServer.new
server.stubs(:pubsub).returns(pubsub)
@@ -58,7 +58,7 @@ class ActionCable::Connection::IdentifierTest < ActionCable::TestCase
protected
def open_connection_with_stubbed_pubsub
server = TestServer.new
- server.stubs(:pubsub).returns(stub_everything('pubsub'))
+ server.stubs(:adapter).returns(stub_everything('adapter'))
open_connection server: server
end
diff --git a/actioncable/test/stubs/test_adapter.rb b/actioncable/test/stubs/test_adapter.rb
new file mode 100644
index 0000000000..bbd142b287
--- /dev/null
+++ b/actioncable/test/stubs/test_adapter.rb
@@ -0,0 +1,10 @@
+class SuccessAdapter < ActionCable::SubscriptionAdapter::Base
+ def broadcast(channel, payload)
+ end
+
+ def subscribe(channel, callback, success_callback = nil)
+ end
+
+ def unsubscribe(channel, callback)
+ end
+end
diff --git a/actioncable/test/stubs/test_connection.rb b/actioncable/test/stubs/test_connection.rb
index 384abc5e76..da98201900 100644
--- a/actioncable/test/stubs/test_connection.rb
+++ b/actioncable/test/stubs/test_connection.rb
@@ -11,6 +11,10 @@ class TestConnection
@transmissions = []
end
+ def pubsub
+ SuccessAdapter.new(TestServer.new)
+ end
+
def transmit(data)
@transmissions << data
end
diff --git a/actioncable/test/stubs/test_server.rb b/actioncable/test/stubs/test_server.rb
index f9168f9b78..6e6541a952 100644
--- a/actioncable/test/stubs/test_server.rb
+++ b/actioncable/test/stubs/test_server.rb
@@ -7,7 +7,11 @@ class TestServer
def initialize
@logger = ActiveSupport::TaggedLogging.new ActiveSupport::Logger.new(StringIO.new)
- @config = OpenStruct.new(log_tags: [])
+ @config = OpenStruct.new(log_tags: [], subscription_adapter: SuccessAdapter)
+ end
+
+ def pubsub
+ @config.subscription_adapter.new(self)
end
def send_async
diff --git a/actioncable/test/subscription_adapter/base_test.rb b/actioncable/test/subscription_adapter/base_test.rb
new file mode 100644
index 0000000000..7a7ae131e6
--- /dev/null
+++ b/actioncable/test/subscription_adapter/base_test.rb
@@ -0,0 +1,73 @@
+require 'test_helper'
+require 'stubs/test_server'
+
+class ActionCable::SubscriptionAdapter::BaseTest < ActionCable::TestCase
+ ## TEST THAT ERRORS ARE RETURNED FOR INHERITORS THAT DON'T OVERRIDE METHODS
+
+ class BrokenAdapter < ActionCable::SubscriptionAdapter::Base
+ end
+
+ setup do
+ @server = TestServer.new
+ @server.config.subscription_adapter = BrokenAdapter
+ @server.config.allowed_request_origins = %w( http://rubyonrails.com )
+ end
+
+ test "#broadcast returns NotImplementedError by default" do
+ assert_raises NotImplementedError do
+ BrokenAdapter.new(@server).broadcast('channel', 'payload')
+ end
+ end
+
+ test "#subscribe returns NotImplementedError by default" do
+ callback = lambda { puts 'callback' }
+ success_callback = lambda { puts 'success' }
+
+ assert_raises NotImplementedError do
+ BrokenAdapter.new(@server).subscribe('channel', callback, success_callback)
+ end
+ end
+
+ test "#unsubscribe returns NotImplementedError by default" do
+ callback = lambda { puts 'callback' }
+
+ assert_raises NotImplementedError do
+ BrokenAdapter.new(@server).unsubscribe('channel', callback)
+ end
+ end
+
+ # TEST METHODS THAT ARE REQUIRED OF THE ADAPTER'S BACKEND STORAGE OBJECT
+
+ test "#broadcast is implemented" do
+ broadcast = SuccessAdapter.new(@server).broadcast('channel', 'payload')
+
+ assert_respond_to(SuccessAdapter.new(@server), :broadcast)
+
+ assert_nothing_raised NotImplementedError do
+ broadcast
+ end
+ end
+
+ test "#subscribe is implemented" do
+ callback = lambda { puts 'callback' }
+ success_callback = lambda { puts 'success' }
+ subscribe = SuccessAdapter.new(@server).subscribe('channel', callback, success_callback)
+
+ assert_respond_to(SuccessAdapter.new(@server), :subscribe)
+
+ assert_nothing_raised NotImplementedError do
+ subscribe
+ end
+ end
+
+ test "#unsubscribe is implemented" do
+ callback = lambda { puts 'callback' }
+ unsubscribe = SuccessAdapter.new(@server).unsubscribe('channel', callback)
+
+ assert_respond_to(SuccessAdapter.new(@server), :unsubscribe)
+
+ assert_nothing_raised NotImplementedError do
+ unsubscribe
+ end
+ end
+end
diff --git a/actioncable/test/test_helper.rb b/actioncable/test/test_helper.rb
index 325305939f..65b45e0c89 100644
--- a/actioncable/test/test_helper.rb
+++ b/actioncable/test/test_helper.rb
@@ -5,7 +5,6 @@ require 'active_support/testing/autorun'
require 'puma'
-require 'em-hiredis'
require 'mocha/setup'