diff options
Diffstat (limited to 'actioncable/lib/action_cable')
-rw-r--r-- | actioncable/lib/action_cable/channel/base.rb | 4 | ||||
-rw-r--r-- | actioncable/lib/action_cable/connection/base.rb | 2 | ||||
-rw-r--r-- | actioncable/lib/action_cable/connection/internal_channel.rb | 12 | ||||
-rw-r--r-- | actioncable/lib/action_cable/engine.rb | 6 | ||||
-rw-r--r-- | actioncable/lib/action_cable/remote_connections.rb | 2 | ||||
-rw-r--r-- | actioncable/lib/action_cable/server/base.rb | 17 | ||||
-rw-r--r-- | actioncable/lib/action_cable/server/broadcasting.rb | 10 | ||||
-rw-r--r-- | actioncable/lib/action_cable/server/configuration.rb | 20 | ||||
-rw-r--r-- | actioncable/lib/action_cable/storage_adapter.rb | 6 | ||||
-rw-r--r-- | actioncable/lib/action_cable/storage_adapter/base.rb | 22 | ||||
-rw-r--r-- | actioncable/lib/action_cable/storage_adapter/redis.rb | 30 |
11 files changed, 93 insertions, 38 deletions
diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb index ce9d62635c..88cdc1cab1 100644 --- a/actioncable/lib/action_cable/channel/base.rb +++ b/actioncable/lib/action_cable/channel/base.rb @@ -133,8 +133,8 @@ module ActionCable @identifier = identifier @params = params - # When a channel is streaming via redis pubsub, we want to delay the confirmation - # transmission until redis pubsub subscription is confirmed. + # When a channel is streaming via pubsub, we want to delay the confirmation + # transmission until pubsub subscription is confirmed. @defer_subscription_confirmation = false @reject_subscription = nil diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb index a8cfdf90f3..bb8850aaa0 100644 --- a/actioncable/lib/action_cable/connection/base.rb +++ b/actioncable/lib/action_cable/connection/base.rb @@ -60,7 +60,7 @@ module ActionCable @subscriptions = ActionCable::Connection::Subscriptions.new(self) @message_buffer = ActionCable::Connection::MessageBuffer.new(self) - @_internal_redis_subscriptions = nil + @_internal_subscriptions = nil @started_at = Time.now end diff --git a/actioncable/lib/action_cable/connection/internal_channel.rb b/actioncable/lib/action_cable/connection/internal_channel.rb index c065a24ab7..63ba293877 100644 --- a/actioncable/lib/action_cable/connection/internal_channel.rb +++ b/actioncable/lib/action_cable/connection/internal_channel.rb @@ -5,24 +5,24 @@ module ActionCable extend ActiveSupport::Concern private - def internal_redis_channel + def internal_channel "action_cable/#{connection_identifier}" end def subscribe_to_internal_channel if connection_identifier.present? callback = -> (message) { process_internal_message(message) } - @_internal_redis_subscriptions ||= [] - @_internal_redis_subscriptions << [ internal_redis_channel, callback ] + @_internal_subscriptions ||= [] + @_internal_subscriptions << [ internal_channel, callback ] - EM.next_tick { pubsub.subscribe(internal_redis_channel, &callback) } + EM.next_tick { pubsub.subscribe(internal_channel, &callback) } logger.info "Registered connection (#{connection_identifier})" end end def unsubscribe_from_internal_channel - if @_internal_redis_subscriptions.present? - @_internal_redis_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe_proc(channel, callback) } } + if @_internal_subscriptions.present? + @_internal_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe_proc(channel, callback) } } end end diff --git a/actioncable/lib/action_cable/engine.rb b/actioncable/lib/action_cable/engine.rb index 2d3caa5b0a..193f54333e 100644 --- a/actioncable/lib/action_cable/engine.rb +++ b/actioncable/lib/action_cable/engine.rb @@ -24,11 +24,11 @@ module ActionCable options = app.config.action_cable options.allowed_request_origins ||= "http://localhost:3000" if ::Rails.env.development? - app.paths.add "config/redis/cable", with: "config/redis/cable.yml" + app.paths.add "config/cable", with: "config/cable.yml" ActiveSupport.on_load(:action_cable) do - if (redis_cable_path = Pathname.new(app.config.paths["config/redis/cable"].first)).exist? - self.redis = Rails.application.config_for(redis_cable_path).with_indifferent_access + if (config_path = Pathname.new(app.config.paths["config/cable"].first)).exist? + self.config_opts = Rails.application.config_for(config_path).with_indifferent_access end options.each { |k,v| send("#{k}=", v) } diff --git a/actioncable/lib/action_cable/remote_connections.rb b/actioncable/lib/action_cable/remote_connections.rb index 1230d905ad..aa2fc95d2f 100644 --- a/actioncable/lib/action_cable/remote_connections.rb +++ b/actioncable/lib/action_cable/remote_connections.rb @@ -39,7 +39,7 @@ module ActionCable # Uses the internal channel to disconnect the connection. def disconnect - server.broadcast internal_redis_channel, type: 'disconnect' + server.broadcast internal_channel, type: 'disconnect' end # Returns all the identifiers that were applied to this connection. diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb index 3785bbd154..6539745c79 100644 --- a/actioncable/lib/action_cable/server/base.rb +++ b/actioncable/lib/action_cable/server/base.rb @@ -1,5 +1,3 @@ -require 'em-hiredis' - module ActionCable module Server # A singleton ActionCable::Server instance is available via ActionCable.server. It's used by the rack process that starts the cable server, but @@ -47,20 +45,9 @@ module ActionCable end end - # The redis pubsub adapter used for all streams/broadcasting. + # The pubsub adapter used for all streams/broadcasting. def pubsub - @pubsub ||= redis.pubsub - end - - # The EventMachine Redis instance used by the pubsub adapter. - def redis - @redis ||= EM::Hiredis.connect(config.redis[:url]).tap do |redis| - redis.on(:reconnect_failed) do - logger.info "[ActionCable] Redis reconnect failed." - # logger.info "[ActionCable] Redis reconnected. Closing all the open connections." - # @connections.map &:close - end - end + @pubsub ||= config.storage_adapter.new(self).pubsub end # All the identifiers applied to the connection class associated with this server. diff --git a/actioncable/lib/action_cable/server/broadcasting.rb b/actioncable/lib/action_cable/server/broadcasting.rb index c759239a0e..847ef50971 100644 --- a/actioncable/lib/action_cable/server/broadcasting.rb +++ b/actioncable/lib/action_cable/server/broadcasting.rb @@ -1,5 +1,3 @@ -require 'redis' - module ActionCable module Server # Broadcasting is how other parts of your application can send messages to the channel subscribers. As explained in Channel, most of the time, these @@ -31,11 +29,6 @@ module ActionCable Broadcaster.new(self, broadcasting) end - # The redis instance used for broadcasting. Not intended for direct user use. - def broadcasting_redis - @broadcasting_redis ||= Redis.new(config.redis) - end - private class Broadcaster attr_reader :server, :broadcasting @@ -46,7 +39,8 @@ module ActionCable def broadcast(message) server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message}" - server.broadcasting_redis.publish broadcasting, ActiveSupport::JSON.encode(message) + broadcast_storage_adapter = server.config.storage_adapter.new(server).broadcast + broadcast_storage_adapter.publish broadcasting, ActiveSupport::JSON.encode(message) end end end diff --git a/actioncable/lib/action_cable/server/configuration.rb b/actioncable/lib/action_cable/server/configuration.rb index 935133cbba..2bed5a9ea2 100644 --- a/actioncable/lib/action_cable/server/configuration.rb +++ b/actioncable/lib/action_cable/server/configuration.rb @@ -5,9 +5,9 @@ module ActionCable class Configuration attr_accessor :logger, :log_tags attr_accessor :connection_class, :worker_pool_size - attr_accessor :redis, :channels_path + attr_accessor :channels_path attr_accessor :disable_request_forgery_protection, :allowed_request_origins - attr_accessor :url + attr_accessor :config_opts, :url def initialize @log_tags = [] @@ -29,6 +29,22 @@ module ActionCable Pathname.new(channel_path).basename.to_s.split('.').first.camelize end end + + ADAPTER = ActionCable::StorageAdapter + + # Returns constant of storage adapter specified in config/cable.yml + # If the adapter cannot be found, this will default to the Redis adapter + def storage_adapter + # "ActionCable::StorageAdapter::#{adapter.capitalize}" + adapter = config_opts['adapter'] + adapter_const = "ActionCable::StorageAdapter::#{adapter.capitalize}" + + if Object.const_defined?(adapter_const) + adapter_const.constantize + else + ADAPTER_BASE::Redis + end + end end end end diff --git a/actioncable/lib/action_cable/storage_adapter.rb b/actioncable/lib/action_cable/storage_adapter.rb new file mode 100644 index 0000000000..991270d2b3 --- /dev/null +++ b/actioncable/lib/action_cable/storage_adapter.rb @@ -0,0 +1,6 @@ +module ActionCable + module StorageAdapter + autoload :Base, 'action_cable/storage_adapter/base' + autoload :Redis, 'action_cable/storage_adapter/redis' + end +end diff --git a/actioncable/lib/action_cable/storage_adapter/base.rb b/actioncable/lib/action_cable/storage_adapter/base.rb new file mode 100644 index 0000000000..26b3ded676 --- /dev/null +++ b/actioncable/lib/action_cable/storage_adapter/base.rb @@ -0,0 +1,22 @@ +module ActionCable + module StorageAdapter + class Base + attr_reader :logger, :server + + def initialize(server) + @server = server + @logger = @server.logger + end + + # Storage connection instance used for broadcasting. Not intended for direct user use. + def broadcast + raise NotImplementedError + end + + # Storage connection instance used for pubsub. + def pubsub + raise NotImplementedError + end + end + end +end diff --git a/actioncable/lib/action_cable/storage_adapter/redis.rb b/actioncable/lib/action_cable/storage_adapter/redis.rb new file mode 100644 index 0000000000..7b712b9b03 --- /dev/null +++ b/actioncable/lib/action_cable/storage_adapter/redis.rb @@ -0,0 +1,30 @@ +require 'em-hiredis' +require 'redis' + +module ActionCable + module StorageAdapter + class Redis < Base + # The redis instance used for broadcasting. Not intended for direct user use. + def broadcast + @broadcast ||= ::Redis.new(@server.config.config_opts) + end + + def pubsub + redis.pubsub + end + + private + + # The EventMachine Redis instance used by the pubsub adapter. + def redis + @redis ||= EM::Hiredis.connect(@server.config.config_opts[:url]).tap do |redis| + redis.on(:reconnect_failed) do + @logger.info "[ActionCable] Redis reconnect failed." + # logger.info "[ActionCable] Redis reconnected. Closing all the open connections." + # @connections.map &:close + end + end + end + end + end +end |