aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Moss <me@jonathanmoss.me>2016-01-06 17:16:02 -0500
committerJon Moss <me@jonathanmoss.me>2016-01-18 18:58:57 -0500
commit0016e0410b11d40a1d730a1232c40f428d67abeb (patch)
tree172b2fe1e0e2424a2eab33c95ec38487d5b5996d
parent75f1b229fdb340b2cdaf632bb6a223213f05dc75 (diff)
downloadrails-0016e0410b11d40a1d730a1232c40f428d67abeb.tar.gz
rails-0016e0410b11d40a1d730a1232c40f428d67abeb.tar.bz2
rails-0016e0410b11d40a1d730a1232c40f428d67abeb.zip
Adapterize ActionCable storage and extract behavior
-rw-r--r--actioncable/lib/action_cable.rb1
-rw-r--r--actioncable/lib/action_cable/channel/base.rb4
-rw-r--r--actioncable/lib/action_cable/connection/base.rb2
-rw-r--r--actioncable/lib/action_cable/connection/internal_channel.rb12
-rw-r--r--actioncable/lib/action_cable/engine.rb6
-rw-r--r--actioncable/lib/action_cable/remote_connections.rb2
-rw-r--r--actioncable/lib/action_cable/server/base.rb17
-rw-r--r--actioncable/lib/action_cable/server/broadcasting.rb10
-rw-r--r--actioncable/lib/action_cable/server/configuration.rb20
-rw-r--r--actioncable/lib/action_cable/storage_adapter.rb6
-rw-r--r--actioncable/lib/action_cable/storage_adapter/base.rb22
-rw-r--r--actioncable/lib/action_cable/storage_adapter/redis.rb30
-rw-r--r--actioncable/test/storage_adapter/base_test.rb64
-rw-r--r--railties/lib/rails/generators/rails/app/app_generator.rb4
-rw-r--r--railties/lib/rails/generators/rails/app/templates/config/cable.yml12
-rw-r--r--railties/lib/rails/generators/rails/app/templates/config/redis/cable.yml9
-rw-r--r--railties/test/generators/app_generator_test.rb2
17 files changed, 173 insertions, 50 deletions
diff --git a/actioncable/lib/action_cable.rb b/actioncable/lib/action_cable.rb
index 97f485b32e..5cc29ecd00 100644
--- a/actioncable/lib/action_cable.rb
+++ b/actioncable/lib/action_cable.rb
@@ -47,4 +47,5 @@ module ActionCable
autoload :Connection
autoload :Channel
autoload :RemoteConnections
+ autoload :StorageAdapter
end
diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb
index ce9d62635c..88cdc1cab1 100644
--- a/actioncable/lib/action_cable/channel/base.rb
+++ b/actioncable/lib/action_cable/channel/base.rb
@@ -133,8 +133,8 @@ module ActionCable
@identifier = identifier
@params = params
- # When a channel is streaming via redis pubsub, we want to delay the confirmation
- # transmission until redis pubsub subscription is confirmed.
+ # When a channel is streaming via pubsub, we want to delay the confirmation
+ # transmission until pubsub subscription is confirmed.
@defer_subscription_confirmation = false
@reject_subscription = nil
diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb
index a8cfdf90f3..bb8850aaa0 100644
--- a/actioncable/lib/action_cable/connection/base.rb
+++ b/actioncable/lib/action_cable/connection/base.rb
@@ -60,7 +60,7 @@ module ActionCable
@subscriptions = ActionCable::Connection::Subscriptions.new(self)
@message_buffer = ActionCable::Connection::MessageBuffer.new(self)
- @_internal_redis_subscriptions = nil
+ @_internal_subscriptions = nil
@started_at = Time.now
end
diff --git a/actioncable/lib/action_cable/connection/internal_channel.rb b/actioncable/lib/action_cable/connection/internal_channel.rb
index c065a24ab7..63ba293877 100644
--- a/actioncable/lib/action_cable/connection/internal_channel.rb
+++ b/actioncable/lib/action_cable/connection/internal_channel.rb
@@ -5,24 +5,24 @@ module ActionCable
extend ActiveSupport::Concern
private
- def internal_redis_channel
+ def internal_channel
"action_cable/#{connection_identifier}"
end
def subscribe_to_internal_channel
if connection_identifier.present?
callback = -> (message) { process_internal_message(message) }
- @_internal_redis_subscriptions ||= []
- @_internal_redis_subscriptions << [ internal_redis_channel, callback ]
+ @_internal_subscriptions ||= []
+ @_internal_subscriptions << [ internal_channel, callback ]
- EM.next_tick { pubsub.subscribe(internal_redis_channel, &callback) }
+ EM.next_tick { pubsub.subscribe(internal_channel, &callback) }
logger.info "Registered connection (#{connection_identifier})"
end
end
def unsubscribe_from_internal_channel
- if @_internal_redis_subscriptions.present?
- @_internal_redis_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe_proc(channel, callback) } }
+ if @_internal_subscriptions.present?
+ @_internal_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe_proc(channel, callback) } }
end
end
diff --git a/actioncable/lib/action_cable/engine.rb b/actioncable/lib/action_cable/engine.rb
index 2d3caa5b0a..193f54333e 100644
--- a/actioncable/lib/action_cable/engine.rb
+++ b/actioncable/lib/action_cable/engine.rb
@@ -24,11 +24,11 @@ module ActionCable
options = app.config.action_cable
options.allowed_request_origins ||= "http://localhost:3000" if ::Rails.env.development?
- app.paths.add "config/redis/cable", with: "config/redis/cable.yml"
+ app.paths.add "config/cable", with: "config/cable.yml"
ActiveSupport.on_load(:action_cable) do
- if (redis_cable_path = Pathname.new(app.config.paths["config/redis/cable"].first)).exist?
- self.redis = Rails.application.config_for(redis_cable_path).with_indifferent_access
+ if (config_path = Pathname.new(app.config.paths["config/cable"].first)).exist?
+ self.config_opts = Rails.application.config_for(config_path).with_indifferent_access
end
options.each { |k,v| send("#{k}=", v) }
diff --git a/actioncable/lib/action_cable/remote_connections.rb b/actioncable/lib/action_cable/remote_connections.rb
index 1230d905ad..aa2fc95d2f 100644
--- a/actioncable/lib/action_cable/remote_connections.rb
+++ b/actioncable/lib/action_cable/remote_connections.rb
@@ -39,7 +39,7 @@ module ActionCable
# Uses the internal channel to disconnect the connection.
def disconnect
- server.broadcast internal_redis_channel, type: 'disconnect'
+ server.broadcast internal_channel, type: 'disconnect'
end
# Returns all the identifiers that were applied to this connection.
diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb
index 3785bbd154..6539745c79 100644
--- a/actioncable/lib/action_cable/server/base.rb
+++ b/actioncable/lib/action_cable/server/base.rb
@@ -1,5 +1,3 @@
-require 'em-hiredis'
-
module ActionCable
module Server
# A singleton ActionCable::Server instance is available via ActionCable.server. It's used by the rack process that starts the cable server, but
@@ -47,20 +45,9 @@ module ActionCable
end
end
- # The redis pubsub adapter used for all streams/broadcasting.
+ # The pubsub adapter used for all streams/broadcasting.
def pubsub
- @pubsub ||= redis.pubsub
- end
-
- # The EventMachine Redis instance used by the pubsub adapter.
- def redis
- @redis ||= EM::Hiredis.connect(config.redis[:url]).tap do |redis|
- redis.on(:reconnect_failed) do
- logger.info "[ActionCable] Redis reconnect failed."
- # logger.info "[ActionCable] Redis reconnected. Closing all the open connections."
- # @connections.map &:close
- end
- end
+ @pubsub ||= config.storage_adapter.new(self).pubsub
end
# All the identifiers applied to the connection class associated with this server.
diff --git a/actioncable/lib/action_cable/server/broadcasting.rb b/actioncable/lib/action_cable/server/broadcasting.rb
index c759239a0e..847ef50971 100644
--- a/actioncable/lib/action_cable/server/broadcasting.rb
+++ b/actioncable/lib/action_cable/server/broadcasting.rb
@@ -1,5 +1,3 @@
-require 'redis'
-
module ActionCable
module Server
# Broadcasting is how other parts of your application can send messages to the channel subscribers. As explained in Channel, most of the time, these
@@ -31,11 +29,6 @@ module ActionCable
Broadcaster.new(self, broadcasting)
end
- # The redis instance used for broadcasting. Not intended for direct user use.
- def broadcasting_redis
- @broadcasting_redis ||= Redis.new(config.redis)
- end
-
private
class Broadcaster
attr_reader :server, :broadcasting
@@ -46,7 +39,8 @@ module ActionCable
def broadcast(message)
server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message}"
- server.broadcasting_redis.publish broadcasting, ActiveSupport::JSON.encode(message)
+ broadcast_storage_adapter = server.config.storage_adapter.new(server).broadcast
+ broadcast_storage_adapter.publish broadcasting, ActiveSupport::JSON.encode(message)
end
end
end
diff --git a/actioncable/lib/action_cable/server/configuration.rb b/actioncable/lib/action_cable/server/configuration.rb
index 935133cbba..2bed5a9ea2 100644
--- a/actioncable/lib/action_cable/server/configuration.rb
+++ b/actioncable/lib/action_cable/server/configuration.rb
@@ -5,9 +5,9 @@ module ActionCable
class Configuration
attr_accessor :logger, :log_tags
attr_accessor :connection_class, :worker_pool_size
- attr_accessor :redis, :channels_path
+ attr_accessor :channels_path
attr_accessor :disable_request_forgery_protection, :allowed_request_origins
- attr_accessor :url
+ attr_accessor :config_opts, :url
def initialize
@log_tags = []
@@ -29,6 +29,22 @@ module ActionCable
Pathname.new(channel_path).basename.to_s.split('.').first.camelize
end
end
+
+ ADAPTER = ActionCable::StorageAdapter
+
+ # Returns constant of storage adapter specified in config/cable.yml
+ # If the adapter cannot be found, this will default to the Redis adapter
+ def storage_adapter
+ # "ActionCable::StorageAdapter::#{adapter.capitalize}"
+ adapter = config_opts['adapter']
+ adapter_const = "ActionCable::StorageAdapter::#{adapter.capitalize}"
+
+ if Object.const_defined?(adapter_const)
+ adapter_const.constantize
+ else
+ ADAPTER_BASE::Redis
+ end
+ end
end
end
end
diff --git a/actioncable/lib/action_cable/storage_adapter.rb b/actioncable/lib/action_cable/storage_adapter.rb
new file mode 100644
index 0000000000..991270d2b3
--- /dev/null
+++ b/actioncable/lib/action_cable/storage_adapter.rb
@@ -0,0 +1,6 @@
+module ActionCable
+ module StorageAdapter
+ autoload :Base, 'action_cable/storage_adapter/base'
+ autoload :Redis, 'action_cable/storage_adapter/redis'
+ end
+end
diff --git a/actioncable/lib/action_cable/storage_adapter/base.rb b/actioncable/lib/action_cable/storage_adapter/base.rb
new file mode 100644
index 0000000000..26b3ded676
--- /dev/null
+++ b/actioncable/lib/action_cable/storage_adapter/base.rb
@@ -0,0 +1,22 @@
+module ActionCable
+ module StorageAdapter
+ class Base
+ attr_reader :logger, :server
+
+ def initialize(server)
+ @server = server
+ @logger = @server.logger
+ end
+
+ # Storage connection instance used for broadcasting. Not intended for direct user use.
+ def broadcast
+ raise NotImplementedError
+ end
+
+ # Storage connection instance used for pubsub.
+ def pubsub
+ raise NotImplementedError
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/storage_adapter/redis.rb b/actioncable/lib/action_cable/storage_adapter/redis.rb
new file mode 100644
index 0000000000..7b712b9b03
--- /dev/null
+++ b/actioncable/lib/action_cable/storage_adapter/redis.rb
@@ -0,0 +1,30 @@
+require 'em-hiredis'
+require 'redis'
+
+module ActionCable
+ module StorageAdapter
+ class Redis < Base
+ # The redis instance used for broadcasting. Not intended for direct user use.
+ def broadcast
+ @broadcast ||= ::Redis.new(@server.config.config_opts)
+ end
+
+ def pubsub
+ redis.pubsub
+ end
+
+ private
+
+ # The EventMachine Redis instance used by the pubsub adapter.
+ def redis
+ @redis ||= EM::Hiredis.connect(@server.config.config_opts[:url]).tap do |redis|
+ redis.on(:reconnect_failed) do
+ @logger.info "[ActionCable] Redis reconnect failed."
+ # logger.info "[ActionCable] Redis reconnected. Closing all the open connections."
+ # @connections.map &:close
+ end
+ end
+ end
+ end
+ end
+end
diff --git a/actioncable/test/storage_adapter/base_test.rb b/actioncable/test/storage_adapter/base_test.rb
new file mode 100644
index 0000000000..e4a25fcfd4
--- /dev/null
+++ b/actioncable/test/storage_adapter/base_test.rb
@@ -0,0 +1,64 @@
+require 'test_helper'
+require 'stubs/test_server'
+
+class ActionCable::StorageAdapter::BaseTest < ActionCable::TestCase
+ ## TEST THAT ERRORS ARE RETURNED FOR INHERITORS THAT DON'T OVERRIDE METHODS
+
+ class BrokenAdapter < ActionCable::StorageAdapter::Base
+ end
+
+ setup do
+ @server = TestServer.new
+ @server.config.allowed_request_origins = %w( http://rubyonrails.com )
+ end
+
+ test "#broadcast returns NotImplementedError by default" do
+ assert_raises NotImplementedError do
+ BrokenAdapter.new(@server).broadcast
+ end
+ end
+
+ test "#pubsub returns NotImplementedError by default" do
+ assert_raises NotImplementedError do
+ BrokenAdapter.new(@server).pubsub
+ end
+ end
+
+ # TEST METHODS THAT ARE REQUIRED OF THE ADAPTER'S BACKEND STORAGE OBJECT
+
+ class SuccessAdapterBackend
+ def publish(channel, message)
+ end
+
+ def subscribe(*channels, &block)
+ end
+
+ def unsubscribe(*channels, &block)
+ end
+ end
+
+ class SuccessAdapter < ActionCable::StorageAdapter::Base
+ def broadcast
+ SuccessAdapterBackend.new
+ end
+
+ def pubsub
+ SuccessAdapterBackend.new
+ end
+ end
+
+ test "#broadcast responds to #publish" do
+ broadcast = SuccessAdapter.new(@server).broadcast
+ assert_respond_to(broadcast, :publish)
+ end
+
+ test "#pubsub responds to #subscribe" do
+ pubsub = SuccessAdapter.new(@server).pubsub
+ assert_respond_to(pubsub, :subscribe)
+ end
+
+ test "#pubsub responds to #unsubscribe" do
+ pubsub = SuccessAdapter.new(@server).pubsub
+ assert_respond_to(pubsub, :unsubscribe)
+ end
+end
diff --git a/railties/lib/rails/generators/rails/app/app_generator.rb b/railties/lib/rails/generators/rails/app/app_generator.rb
index f4deec7135..7ec4d3bbd3 100644
--- a/railties/lib/rails/generators/rails/app/app_generator.rb
+++ b/railties/lib/rails/generators/rails/app/app_generator.rb
@@ -78,11 +78,11 @@ module Rails
template "application.rb"
template "environment.rb"
template "secrets.yml"
+ template "cable.yml" unless options[:skip_action_cable]
directory "environments"
directory "initializers"
directory "locales"
- directory "redis" unless options[:skip_action_cable]
end
end
@@ -315,7 +315,7 @@ module Rails
def delete_action_cable_files_skipping_action_cable
if options[:skip_action_cable]
- remove_file 'config/redis/cable.yml'
+ remove_file 'config/cable.yml'
remove_file 'app/assets/javascripts/cable.coffee'
remove_dir 'app/channels'
end
diff --git a/railties/lib/rails/generators/rails/app/templates/config/cable.yml b/railties/lib/rails/generators/rails/app/templates/config/cable.yml
new file mode 100644
index 0000000000..004adb7b3c
--- /dev/null
+++ b/railties/lib/rails/generators/rails/app/templates/config/cable.yml
@@ -0,0 +1,12 @@
+# Action Cable uses Redis by default to administer connections, channels, and sending/receiving messages over the WebSocket.
+production:
+ adapter: redis
+ url: redis://localhost:6379/1
+
+development:
+ adapter: redis
+ url: redis://localhost:6379/2
+
+test:
+ adapter: redis
+ url: redis://localhost:6379/3
diff --git a/railties/lib/rails/generators/rails/app/templates/config/redis/cable.yml b/railties/lib/rails/generators/rails/app/templates/config/redis/cable.yml
deleted file mode 100644
index 0176be24f9..0000000000
--- a/railties/lib/rails/generators/rails/app/templates/config/redis/cable.yml
+++ /dev/null
@@ -1,9 +0,0 @@
-# Action Cable uses Redis to administer connections, channels, and sending/receiving messages over the WebSocket.
-production:
- url: redis://localhost:6379/1
-
-development:
- url: redis://localhost:6379/2
-
-test:
- url: redis://localhost:6379/3
diff --git a/railties/test/generators/app_generator_test.rb b/railties/test/generators/app_generator_test.rb
index e5480180ce..cabbc802e1 100644
--- a/railties/test/generators/app_generator_test.rb
+++ b/railties/test/generators/app_generator_test.rb
@@ -392,7 +392,7 @@ class AppGeneratorTest < Rails::Generators::TestCase
def test_generator_if_skip_action_cable_is_given
run_generator [destination_root, "--skip-action-cable"]
assert_file "config/application.rb", /#\s+require\s+["']action_cable\/engine["']/
- assert_no_file "config/redis/cable.yml"
+ assert_no_file "config/cable.yml"
assert_no_file "app/assets/javascripts/cable.coffee"
assert_no_file "app/channels"
end