diff options
Diffstat (limited to 'lib/action_cable')
-rw-r--r-- | lib/action_cable/connection.rb | 2 | ||||
-rw-r--r-- | lib/action_cable/connection/base.rb | 13 | ||||
-rw-r--r-- | lib/action_cable/connection/internal_channel.rb (renamed from lib/action_cable/connection/registry.rb) | 33 |
3 files changed, 16 insertions, 32 deletions
diff --git a/lib/action_cable/connection.rb b/lib/action_cable/connection.rb index 91fc73713c..665a851b11 100644 --- a/lib/action_cable/connection.rb +++ b/lib/action_cable/connection.rb @@ -1,7 +1,7 @@ module ActionCable module Connection autoload :Base, 'action_cable/connection/base' - autoload :Registry, 'action_cable/connection/registry' + autoload :InternalChannel, 'action_cable/connection/internal_channel' autoload :Identifier, 'action_cable/connection/identifier' end end diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index b2d758a124..0c20f11502 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -1,10 +1,17 @@ module ActionCable module Connection class Base - include Registry + include InternalChannel, Identifier PING_INTERVAL = 3 + class_attribute :identifiers + self.identifiers = Set.new + + def self.identified_by(*identifiers) + self.identifiers += identifiers + end + attr_reader :env, :server delegate :worker_pool, :pubsub, :logger, to: :server @@ -89,7 +96,7 @@ module ActionCable private def initialize_connection connect if respond_to?(:connect) - register_connection + subscribe_to_internal_channel @accept_messages = true worker_pool.async.invoke(self, :received_data, @pending_messages.shift) until @pending_messages.empty? @@ -97,7 +104,7 @@ module ActionCable def on_connection_closed cleanup_subscriptions - cleanup_internal_redis_subscriptions + unsubscribe_from_internal_channel disconnect if respond_to?(:disconnect) end diff --git a/lib/action_cable/connection/registry.rb b/lib/action_cable/connection/internal_channel.rb index 03a0bf4fe9..745fd99b78 100644 --- a/lib/action_cable/connection/registry.rb +++ b/lib/action_cable/connection/internal_channel.rb @@ -1,22 +1,11 @@ module ActionCable module Connection - module Registry + module InternalChannel extend ActiveSupport::Concern - included do - class_attribute :identifiers - self.identifiers = Set.new - end - - module ClassMethods - def identified_by(*identifiers) - self.identifiers += identifiers - end - end - - def register_connection + def subscribe_to_internal_channel if connection_identifier.present? - callback = -> (message) { process_registry_message(message) } + callback = -> (message) { process_internal_message(message) } @_internal_redis_subscriptions ||= [] @_internal_redis_subscriptions << [ internal_redis_channel, callback ] @@ -25,26 +14,14 @@ module ActionCable end end - def internal_redis_channel - "action_cable/#{connection_identifier}" - end - - def connection_identifier - @connection_identifier ||= connection_gid identifiers.map { |id| instance_variable_get("@#{id}")} - end - - def connection_gid(ids) - ids.map {|o| o.to_global_id.to_s }.sort.join(":") - end - - def cleanup_internal_redis_subscriptions + def unsubscribe_from_internal_channel if @_internal_redis_subscriptions.present? @_internal_redis_subscriptions.each { |channel, callback| pubsub.unsubscribe_proc(channel, callback) } end end private - def process_registry_message(message) + def process_internal_message(message) message = ActiveSupport::JSON.decode(message) case message['type'] |