aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable')
-rw-r--r--actioncable/actioncable.gemspec1
-rw-r--r--actioncable/lib/action_cable/channel/streams.rb6
-rw-r--r--actioncable/lib/action_cable/connection/base.rb2
-rw-r--r--actioncable/lib/action_cable/connection/internal_channel.rb4
-rw-r--r--actioncable/lib/action_cable/server/base.rb4
-rw-r--r--actioncable/lib/action_cable/server/broadcasting.rb2
-rw-r--r--actioncable/lib/action_cable/server/configuration.rb19
-rw-r--r--actioncable/lib/action_cable/subscription_adapter.rb2
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/postgresql.rb8
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/redis.rb41
-rw-r--r--actioncable/test/channel/stream_test.rb6
-rw-r--r--actioncable/test/connection/identifier_test.rb8
-rw-r--r--actioncable/test/stubs/test_connection.rb2
-rw-r--r--actioncable/test/stubs/test_server.rb2
14 files changed, 52 insertions, 55 deletions
diff --git a/actioncable/actioncable.gemspec b/actioncable/actioncable.gemspec
index 847fcc71c3..a36acc8f6f 100644
--- a/actioncable/actioncable.gemspec
+++ b/actioncable/actioncable.gemspec
@@ -21,6 +21,7 @@ Gem::Specification.new do |s|
s.add_dependency 'actionpack', version
s.add_dependency 'coffee-rails', '~> 4.1.0'
+ s.add_dependency 'eventmachine', '~> 1.0'
s.add_dependency 'faye-websocket', '~> 0.10.0'
s.add_dependency 'websocket-driver', '~> 0.6.1'
diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb
index 89dcbdfa27..589946c3db 100644
--- a/actioncable/lib/action_cable/channel/streams.rb
+++ b/actioncable/lib/action_cable/channel/streams.rb
@@ -76,7 +76,7 @@ module ActionCable
streams << [ broadcasting, callback ]
EM.next_tick do
- adapter.subscribe(broadcasting, callback, lambda do |reply|
+ pubsub.subscribe(broadcasting, callback, lambda do |reply|
transmit_subscription_confirmation
logger.info "#{self.class.name} is streaming from #{broadcasting}"
end)
@@ -92,13 +92,13 @@ module ActionCable
def stop_all_streams
streams.each do |broadcasting, callback|
- adapter.unsubscribe broadcasting, callback
+ pubsub.unsubscribe broadcasting, callback
logger.info "#{self.class.name} stopped streaming from #{broadcasting}"
end.clear
end
private
- delegate :adapter, to: :connection
+ delegate :pubsub, to: :connection
def streams
@_streams ||= []
diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb
index 2d7f99b09a..bb8850aaa0 100644
--- a/actioncable/lib/action_cable/connection/base.rb
+++ b/actioncable/lib/action_cable/connection/base.rb
@@ -49,7 +49,7 @@ module ActionCable
include Authorization
attr_reader :server, :env, :subscriptions, :logger
- delegate :worker_pool, :adapter, to: :server
+ delegate :worker_pool, :pubsub, to: :server
def initialize(server, env)
@server, @env = server, env
diff --git a/actioncable/lib/action_cable/connection/internal_channel.rb b/actioncable/lib/action_cable/connection/internal_channel.rb
index c618e9d087..54ed7672d2 100644
--- a/actioncable/lib/action_cable/connection/internal_channel.rb
+++ b/actioncable/lib/action_cable/connection/internal_channel.rb
@@ -15,14 +15,14 @@ module ActionCable
@_internal_subscriptions ||= []
@_internal_subscriptions << [ internal_channel, callback ]
- EM.next_tick { adapter.subscribe(internal_channel, callback) }
+ EM.next_tick { pubsub.subscribe(internal_channel, callback) }
logger.info "Registered connection (#{connection_identifier})"
end
end
def unsubscribe_from_internal_channel
if @_internal_subscriptions.present?
- @_internal_subscriptions.each { |channel, callback| EM.next_tick { adapter.unsubscribe(channel, callback) } }
+ @_internal_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe(channel, callback) } }
end
end
diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb
index f44d0fdfb7..3385a4c9f3 100644
--- a/actioncable/lib/action_cable/server/base.rb
+++ b/actioncable/lib/action_cable/server/base.rb
@@ -46,8 +46,8 @@ module ActionCable
end
# Adapter used for all streams/broadcasting.
- def adapter
- @adapter ||= config.subscription_adapter.new(self)
+ def pubsub
+ @pubsub ||= config.pubsub_adapter.new(self)
end
# All the identifiers applied to the connection class associated with this server.
diff --git a/actioncable/lib/action_cable/server/broadcasting.rb b/actioncable/lib/action_cable/server/broadcasting.rb
index 021589b82d..4a26ed9269 100644
--- a/actioncable/lib/action_cable/server/broadcasting.rb
+++ b/actioncable/lib/action_cable/server/broadcasting.rb
@@ -39,7 +39,7 @@ module ActionCable
def broadcast(message)
server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message}"
- server.adapter.broadcast broadcasting, ActiveSupport::JSON.encode(message)
+ server.pubsub.broadcast broadcasting, ActiveSupport::JSON.encode(message)
end
end
end
diff --git a/actioncable/lib/action_cable/server/configuration.rb b/actioncable/lib/action_cable/server/configuration.rb
index cdf5e9eb1c..7bd67110a5 100644
--- a/actioncable/lib/action_cable/server/configuration.rb
+++ b/actioncable/lib/action_cable/server/configuration.rb
@@ -30,11 +30,20 @@ module ActionCable
end
end
- # Returns constant of subscription adapter specified in config/cable.yml
- # If the adapter cannot be found, this will default to the Redis adapter
- def subscription_adapter
- # Defaults to redis if no adapter is set
- adapter = cable.fetch('adapter') { 'redis' }
+ # Returns constant of subscription adapter specified in config/cable.yml.
+ # If the adapter cannot be found, this will default to the Redis adapter.
+ # Also makes sure proper dependencies are required.
+ def pubsub_adapter
+ adapter = (cable.fetch('adapter') { 'redis' })
+ path_to_adapter = "action_cable/subscription_adapter/#{adapter}"
+ begin
+ require path_to_adapter
+ rescue Gem::LoadError => e
+ raise Gem::LoadError, "Specified '#{adapter}' for Action Cable pubsub adapter, but the gem is not loaded. Add `gem '#{e.name}'` to your Gemfile (and ensure its version is at the minimum required by Action Cable)."
+ rescue LoadError => e
+ raise LoadError, "Could not load '#{path_to_adapter}'. Make sure that the adapter in config/cable.yml is valid. If you use an adapter other than 'postgresql' or 'redis' add the necessary adapter gem to the Gemfile.", e.backtrace
+ end
+ ###
adapter.camelize
adapter = 'PostgreSQL' if adapter == 'Postgresql'
"ActionCable::SubscriptionAdapter::#{adapter}".constantize
diff --git a/actioncable/lib/action_cable/subscription_adapter.rb b/actioncable/lib/action_cable/subscription_adapter.rb
index 287d2b9611..e770f4fb00 100644
--- a/actioncable/lib/action_cable/subscription_adapter.rb
+++ b/actioncable/lib/action_cable/subscription_adapter.rb
@@ -1,7 +1,5 @@
module ActionCable
module SubscriptionAdapter
autoload :Base, 'action_cable/subscription_adapter/base'
- autoload :PostgreSQL, 'action_cable/subscription_adapter/postgresql'
- autoload :Redis, 'action_cable/subscription_adapter/redis'
end
end
diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
index afa99355e8..64c519beed 100644
--- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
@@ -1,11 +1,7 @@
+gem 'pg', '~> 0.18'
+require 'pg'
require 'thread'
-begin
- require 'pg'
-rescue Gem::LoadError => e
- raise Gem::LoadError, "You are trying to use the PostgreSQL ActionCable adapter, but do not have the proper gems installed. Add `gem 'pg'` to your Gemfile (and ensure its version is at the minimum required by ActionCable)."
-end
-
module ActionCable
module SubscriptionAdapter
class PostgreSQL < Base # :nodoc:
diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb
index c6d8371f16..9615430be4 100644
--- a/actioncable/lib/action_cable/subscription_adapter/redis.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb
@@ -1,19 +1,18 @@
-begin
- require 'em-hiredis'
- require 'redis'
-rescue Gem::LoadError => e
- raise Gem::LoadError, "You are trying to use the Redis ActionCable adapter, but do not have the proper gems installed. Add `gem 'em-hiredis'` and `gem 'redis'` to your Gemfile (and ensure its version is at the minimum required by ActionCable)."
-end
+gem 'em-hiredis', '~> 0.3.0'
+gem 'redis', '~> 3.0'
+require 'em-hiredis'
+require 'redis'
module ActionCable
module SubscriptionAdapter
- class Redis < Base
+ class Redis < Base # :nodoc:
+ # The redis instance used for broadcasting. Not intended for direct user use.
def broadcast(channel, payload)
- redis_conn.publish(channel, payload)
+ broadcast_redis_connection.publish(channel, payload)
end
def subscribe(channel, message_callback, success_callback = nil)
- hi_redis_conn.pubsub.subscribe(channel, &message_callback).tap do |result|
+ subscription_redis_connection.pubsub.subscribe(channel, &message_callback).tap do |result|
result.callback(&success_callback) if success_callback
end
end
@@ -23,23 +22,17 @@ module ActionCable
end
private
-
- # The redis instance used for broadcasting. Not intended for direct user use.
- def redis_conn
- @broadcast ||= ::Redis.new(@server.config.config_opts)
- end
-
- # The EventMachine Redis instance used by the pubsub adapter.
- def hi_redis_conn
- @hi_redis_conn ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis|
- redis.on(:reconnect_failed) do
- @logger.info "[ActionCable] Redis reconnect failed."
+ def subscription_redis_connection
+ @subscription_redis_connection ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis|
+ redis.on(:reconnect_failed) do
+ @logger.info "[ActionCable] Redis reconnect failed."
+ end
end
end
- end
- def redis_conn
- @redis_conn ||= ::Redis.new(@server.config.cable)
- end
+
+ def broadcast_redis_connection
+ @broadcast_redis_connection ||= ::Redis.new(@server.config.cable)
+ end
end
end
end
diff --git a/actioncable/test/channel/stream_test.rb b/actioncable/test/channel/stream_test.rb
index 8424310ca2..3fa2b291b7 100644
--- a/actioncable/test/channel/stream_test.rb
+++ b/actioncable/test/channel/stream_test.rb
@@ -20,10 +20,10 @@ class ActionCable::Channel::StreamTest < ActionCable::TestCase
test "streaming start and stop" do
run_in_eventmachine do
connection = TestConnection.new
- connection.expects(:adapter).returns mock().tap { |m| m.expects(:subscribe).with("test_room_1", kind_of(Proc), kind_of(Proc)).returns stub_everything(:adapter) }
+ connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("test_room_1", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) }
channel = ChatChannel.new connection, "{id: 1}", { id: 1 }
- connection.expects(:adapter).returns mock().tap { |m| m.expects(:unsubscribe) }
+ connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe) }
channel.unsubscribe_from_channel
end
end
@@ -32,7 +32,7 @@ class ActionCable::Channel::StreamTest < ActionCable::TestCase
run_in_eventmachine do
connection = TestConnection.new
EM.next_tick do
- connection.expects(:adapter).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:adapter) }
+ connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) }
end
channel = ChatChannel.new connection, ""
diff --git a/actioncable/test/connection/identifier_test.rb b/actioncable/test/connection/identifier_test.rb
index bdc793e56d..a110dfdee0 100644
--- a/actioncable/test/connection/identifier_test.rb
+++ b/actioncable/test/connection/identifier_test.rb
@@ -23,12 +23,12 @@ class ActionCable::Connection::IdentifierTest < ActionCable::TestCase
test "should subscribe to internal channel on open and unsubscribe on close" do
run_in_eventmachine do
- adapter = mock('adapter')
- adapter.expects(:subscribe).with('action_cable/User#lifo', kind_of(Proc))
- adapter.expects(:unsubscribe).with('action_cable/User#lifo', kind_of(Proc))
+ pubsub = mock('pubsub_adapter')
+ pubsub.expects(:subscribe).with('action_cable/User#lifo', kind_of(Proc))
+ pubsub.expects(:unsubscribe).with('action_cable/User#lifo', kind_of(Proc))
server = TestServer.new
- server.stubs(:adapter).returns(adapter)
+ server.stubs(:pubsub).returns(pubsub)
open_connection server: server
close_connection
diff --git a/actioncable/test/stubs/test_connection.rb b/actioncable/test/stubs/test_connection.rb
index fe87dbcb36..da98201900 100644
--- a/actioncable/test/stubs/test_connection.rb
+++ b/actioncable/test/stubs/test_connection.rb
@@ -11,7 +11,7 @@ class TestConnection
@transmissions = []
end
- def adapter
+ def pubsub
SuccessAdapter.new(TestServer.new)
end
diff --git a/actioncable/test/stubs/test_server.rb b/actioncable/test/stubs/test_server.rb
index 067266ed57..6e6541a952 100644
--- a/actioncable/test/stubs/test_server.rb
+++ b/actioncable/test/stubs/test_server.rb
@@ -10,7 +10,7 @@ class TestServer
@config = OpenStruct.new(log_tags: [], subscription_adapter: SuccessAdapter)
end
- def adapter
+ def pubsub
@config.subscription_adapter.new(self)
end