diff options
Diffstat (limited to 'actioncable')
-rw-r--r-- | actioncable/actioncable.gemspec | 1 | ||||
-rw-r--r-- | actioncable/lib/action_cable/channel/streams.rb | 6 | ||||
-rw-r--r-- | actioncable/lib/action_cable/connection/base.rb | 2 | ||||
-rw-r--r-- | actioncable/lib/action_cable/connection/internal_channel.rb | 4 | ||||
-rw-r--r-- | actioncable/lib/action_cable/server/base.rb | 4 | ||||
-rw-r--r-- | actioncable/lib/action_cable/server/broadcasting.rb | 2 | ||||
-rw-r--r-- | actioncable/lib/action_cable/server/configuration.rb | 19 | ||||
-rw-r--r-- | actioncable/lib/action_cable/subscription_adapter.rb | 2 | ||||
-rw-r--r-- | actioncable/lib/action_cable/subscription_adapter/postgresql.rb | 8 | ||||
-rw-r--r-- | actioncable/lib/action_cable/subscription_adapter/redis.rb | 41 | ||||
-rw-r--r-- | actioncable/test/channel/stream_test.rb | 6 | ||||
-rw-r--r-- | actioncable/test/connection/identifier_test.rb | 8 | ||||
-rw-r--r-- | actioncable/test/stubs/test_connection.rb | 2 | ||||
-rw-r--r-- | actioncable/test/stubs/test_server.rb | 2 |
14 files changed, 52 insertions, 55 deletions
diff --git a/actioncable/actioncable.gemspec b/actioncable/actioncable.gemspec index 847fcc71c3..a36acc8f6f 100644 --- a/actioncable/actioncable.gemspec +++ b/actioncable/actioncable.gemspec @@ -21,6 +21,7 @@ Gem::Specification.new do |s| s.add_dependency 'actionpack', version s.add_dependency 'coffee-rails', '~> 4.1.0' + s.add_dependency 'eventmachine', '~> 1.0' s.add_dependency 'faye-websocket', '~> 0.10.0' s.add_dependency 'websocket-driver', '~> 0.6.1' diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index 89dcbdfa27..589946c3db 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -76,7 +76,7 @@ module ActionCable streams << [ broadcasting, callback ] EM.next_tick do - adapter.subscribe(broadcasting, callback, lambda do |reply| + pubsub.subscribe(broadcasting, callback, lambda do |reply| transmit_subscription_confirmation logger.info "#{self.class.name} is streaming from #{broadcasting}" end) @@ -92,13 +92,13 @@ module ActionCable def stop_all_streams streams.each do |broadcasting, callback| - adapter.unsubscribe broadcasting, callback + pubsub.unsubscribe broadcasting, callback logger.info "#{self.class.name} stopped streaming from #{broadcasting}" end.clear end private - delegate :adapter, to: :connection + delegate :pubsub, to: :connection def streams @_streams ||= [] diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb index 2d7f99b09a..bb8850aaa0 100644 --- a/actioncable/lib/action_cable/connection/base.rb +++ b/actioncable/lib/action_cable/connection/base.rb @@ -49,7 +49,7 @@ module ActionCable include Authorization attr_reader :server, :env, :subscriptions, :logger - delegate :worker_pool, :adapter, to: :server + delegate :worker_pool, :pubsub, to: :server def initialize(server, env) @server, @env = server, env diff --git a/actioncable/lib/action_cable/connection/internal_channel.rb b/actioncable/lib/action_cable/connection/internal_channel.rb index c618e9d087..54ed7672d2 100644 --- a/actioncable/lib/action_cable/connection/internal_channel.rb +++ b/actioncable/lib/action_cable/connection/internal_channel.rb @@ -15,14 +15,14 @@ module ActionCable @_internal_subscriptions ||= [] @_internal_subscriptions << [ internal_channel, callback ] - EM.next_tick { adapter.subscribe(internal_channel, callback) } + EM.next_tick { pubsub.subscribe(internal_channel, callback) } logger.info "Registered connection (#{connection_identifier})" end end def unsubscribe_from_internal_channel if @_internal_subscriptions.present? - @_internal_subscriptions.each { |channel, callback| EM.next_tick { adapter.unsubscribe(channel, callback) } } + @_internal_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe(channel, callback) } } end end diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb index f44d0fdfb7..3385a4c9f3 100644 --- a/actioncable/lib/action_cable/server/base.rb +++ b/actioncable/lib/action_cable/server/base.rb @@ -46,8 +46,8 @@ module ActionCable end # Adapter used for all streams/broadcasting. - def adapter - @adapter ||= config.subscription_adapter.new(self) + def pubsub + @pubsub ||= config.pubsub_adapter.new(self) end # All the identifiers applied to the connection class associated with this server. diff --git a/actioncable/lib/action_cable/server/broadcasting.rb b/actioncable/lib/action_cable/server/broadcasting.rb index 021589b82d..4a26ed9269 100644 --- a/actioncable/lib/action_cable/server/broadcasting.rb +++ b/actioncable/lib/action_cable/server/broadcasting.rb @@ -39,7 +39,7 @@ module ActionCable def broadcast(message) server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message}" - server.adapter.broadcast broadcasting, ActiveSupport::JSON.encode(message) + server.pubsub.broadcast broadcasting, ActiveSupport::JSON.encode(message) end end end diff --git a/actioncable/lib/action_cable/server/configuration.rb b/actioncable/lib/action_cable/server/configuration.rb index cdf5e9eb1c..7bd67110a5 100644 --- a/actioncable/lib/action_cable/server/configuration.rb +++ b/actioncable/lib/action_cable/server/configuration.rb @@ -30,11 +30,20 @@ module ActionCable end end - # Returns constant of subscription adapter specified in config/cable.yml - # If the adapter cannot be found, this will default to the Redis adapter - def subscription_adapter - # Defaults to redis if no adapter is set - adapter = cable.fetch('adapter') { 'redis' } + # Returns constant of subscription adapter specified in config/cable.yml. + # If the adapter cannot be found, this will default to the Redis adapter. + # Also makes sure proper dependencies are required. + def pubsub_adapter + adapter = (cable.fetch('adapter') { 'redis' }) + path_to_adapter = "action_cable/subscription_adapter/#{adapter}" + begin + require path_to_adapter + rescue Gem::LoadError => e + raise Gem::LoadError, "Specified '#{adapter}' for Action Cable pubsub adapter, but the gem is not loaded. Add `gem '#{e.name}'` to your Gemfile (and ensure its version is at the minimum required by Action Cable)." + rescue LoadError => e + raise LoadError, "Could not load '#{path_to_adapter}'. Make sure that the adapter in config/cable.yml is valid. If you use an adapter other than 'postgresql' or 'redis' add the necessary adapter gem to the Gemfile.", e.backtrace + end + ### adapter.camelize adapter = 'PostgreSQL' if adapter == 'Postgresql' "ActionCable::SubscriptionAdapter::#{adapter}".constantize diff --git a/actioncable/lib/action_cable/subscription_adapter.rb b/actioncable/lib/action_cable/subscription_adapter.rb index 287d2b9611..e770f4fb00 100644 --- a/actioncable/lib/action_cable/subscription_adapter.rb +++ b/actioncable/lib/action_cable/subscription_adapter.rb @@ -1,7 +1,5 @@ module ActionCable module SubscriptionAdapter autoload :Base, 'action_cable/subscription_adapter/base' - autoload :PostgreSQL, 'action_cable/subscription_adapter/postgresql' - autoload :Redis, 'action_cable/subscription_adapter/redis' end end diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb index afa99355e8..64c519beed 100644 --- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb +++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb @@ -1,11 +1,7 @@ +gem 'pg', '~> 0.18' +require 'pg' require 'thread' -begin - require 'pg' -rescue Gem::LoadError => e - raise Gem::LoadError, "You are trying to use the PostgreSQL ActionCable adapter, but do not have the proper gems installed. Add `gem 'pg'` to your Gemfile (and ensure its version is at the minimum required by ActionCable)." -end - module ActionCable module SubscriptionAdapter class PostgreSQL < Base # :nodoc: diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb index c6d8371f16..9615430be4 100644 --- a/actioncable/lib/action_cable/subscription_adapter/redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb @@ -1,19 +1,18 @@ -begin - require 'em-hiredis' - require 'redis' -rescue Gem::LoadError => e - raise Gem::LoadError, "You are trying to use the Redis ActionCable adapter, but do not have the proper gems installed. Add `gem 'em-hiredis'` and `gem 'redis'` to your Gemfile (and ensure its version is at the minimum required by ActionCable)." -end +gem 'em-hiredis', '~> 0.3.0' +gem 'redis', '~> 3.0' +require 'em-hiredis' +require 'redis' module ActionCable module SubscriptionAdapter - class Redis < Base + class Redis < Base # :nodoc: + # The redis instance used for broadcasting. Not intended for direct user use. def broadcast(channel, payload) - redis_conn.publish(channel, payload) + broadcast_redis_connection.publish(channel, payload) end def subscribe(channel, message_callback, success_callback = nil) - hi_redis_conn.pubsub.subscribe(channel, &message_callback).tap do |result| + subscription_redis_connection.pubsub.subscribe(channel, &message_callback).tap do |result| result.callback(&success_callback) if success_callback end end @@ -23,23 +22,17 @@ module ActionCable end private - - # The redis instance used for broadcasting. Not intended for direct user use. - def redis_conn - @broadcast ||= ::Redis.new(@server.config.config_opts) - end - - # The EventMachine Redis instance used by the pubsub adapter. - def hi_redis_conn - @hi_redis_conn ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis| - redis.on(:reconnect_failed) do - @logger.info "[ActionCable] Redis reconnect failed." + def subscription_redis_connection + @subscription_redis_connection ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis| + redis.on(:reconnect_failed) do + @logger.info "[ActionCable] Redis reconnect failed." + end end end - end - def redis_conn - @redis_conn ||= ::Redis.new(@server.config.cable) - end + + def broadcast_redis_connection + @broadcast_redis_connection ||= ::Redis.new(@server.config.cable) + end end end end diff --git a/actioncable/test/channel/stream_test.rb b/actioncable/test/channel/stream_test.rb index 8424310ca2..3fa2b291b7 100644 --- a/actioncable/test/channel/stream_test.rb +++ b/actioncable/test/channel/stream_test.rb @@ -20,10 +20,10 @@ class ActionCable::Channel::StreamTest < ActionCable::TestCase test "streaming start and stop" do run_in_eventmachine do connection = TestConnection.new - connection.expects(:adapter).returns mock().tap { |m| m.expects(:subscribe).with("test_room_1", kind_of(Proc), kind_of(Proc)).returns stub_everything(:adapter) } + connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("test_room_1", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } channel = ChatChannel.new connection, "{id: 1}", { id: 1 } - connection.expects(:adapter).returns mock().tap { |m| m.expects(:unsubscribe) } + connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe) } channel.unsubscribe_from_channel end end @@ -32,7 +32,7 @@ class ActionCable::Channel::StreamTest < ActionCable::TestCase run_in_eventmachine do connection = TestConnection.new EM.next_tick do - connection.expects(:adapter).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:adapter) } + connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } end channel = ChatChannel.new connection, "" diff --git a/actioncable/test/connection/identifier_test.rb b/actioncable/test/connection/identifier_test.rb index bdc793e56d..a110dfdee0 100644 --- a/actioncable/test/connection/identifier_test.rb +++ b/actioncable/test/connection/identifier_test.rb @@ -23,12 +23,12 @@ class ActionCable::Connection::IdentifierTest < ActionCable::TestCase test "should subscribe to internal channel on open and unsubscribe on close" do run_in_eventmachine do - adapter = mock('adapter') - adapter.expects(:subscribe).with('action_cable/User#lifo', kind_of(Proc)) - adapter.expects(:unsubscribe).with('action_cable/User#lifo', kind_of(Proc)) + pubsub = mock('pubsub_adapter') + pubsub.expects(:subscribe).with('action_cable/User#lifo', kind_of(Proc)) + pubsub.expects(:unsubscribe).with('action_cable/User#lifo', kind_of(Proc)) server = TestServer.new - server.stubs(:adapter).returns(adapter) + server.stubs(:pubsub).returns(pubsub) open_connection server: server close_connection diff --git a/actioncable/test/stubs/test_connection.rb b/actioncable/test/stubs/test_connection.rb index fe87dbcb36..da98201900 100644 --- a/actioncable/test/stubs/test_connection.rb +++ b/actioncable/test/stubs/test_connection.rb @@ -11,7 +11,7 @@ class TestConnection @transmissions = [] end - def adapter + def pubsub SuccessAdapter.new(TestServer.new) end diff --git a/actioncable/test/stubs/test_server.rb b/actioncable/test/stubs/test_server.rb index 067266ed57..6e6541a952 100644 --- a/actioncable/test/stubs/test_server.rb +++ b/actioncable/test/stubs/test_server.rb @@ -10,7 +10,7 @@ class TestServer @config = OpenStruct.new(log_tags: [], subscription_adapter: SuccessAdapter) end - def adapter + def pubsub @config.subscription_adapter.new(self) end |