From 005a99ebf687051df12af96e83b677e4abda9b21 Mon Sep 17 00:00:00 2001 From: Pratik Naik <pratiknaik@gmail.com> Date: Thu, 9 Apr 2015 17:26:18 -0500 Subject: Rename Registry to InternalChannel and remove dup methods --- lib/action_cable/connection.rb | 2 +- lib/action_cable/connection/base.rb | 13 +++-- lib/action_cable/connection/internal_channel.rb | 41 ++++++++++++++++ lib/action_cable/connection/registry.rb | 64 ------------------------- 4 files changed, 52 insertions(+), 68 deletions(-) create mode 100644 lib/action_cable/connection/internal_channel.rb delete mode 100644 lib/action_cable/connection/registry.rb (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection.rb b/lib/action_cable/connection.rb index 91fc73713c..665a851b11 100644 --- a/lib/action_cable/connection.rb +++ b/lib/action_cable/connection.rb @@ -1,7 +1,7 @@ module ActionCable module Connection autoload :Base, 'action_cable/connection/base' - autoload :Registry, 'action_cable/connection/registry' + autoload :InternalChannel, 'action_cable/connection/internal_channel' autoload :Identifier, 'action_cable/connection/identifier' end end diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index b2d758a124..0c20f11502 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -1,10 +1,17 @@ module ActionCable module Connection class Base - include Registry + include InternalChannel, Identifier PING_INTERVAL = 3 + class_attribute :identifiers + self.identifiers = Set.new + + def self.identified_by(*identifiers) + self.identifiers += identifiers + end + attr_reader :env, :server delegate :worker_pool, :pubsub, :logger, to: :server @@ -89,7 +96,7 @@ module ActionCable private def initialize_connection connect if respond_to?(:connect) - register_connection + subscribe_to_internal_channel @accept_messages = true worker_pool.async.invoke(self, :received_data, @pending_messages.shift) until @pending_messages.empty? @@ -97,7 +104,7 @@ module ActionCable def on_connection_closed cleanup_subscriptions - cleanup_internal_redis_subscriptions + unsubscribe_from_internal_channel disconnect if respond_to?(:disconnect) end diff --git a/lib/action_cable/connection/internal_channel.rb b/lib/action_cable/connection/internal_channel.rb new file mode 100644 index 0000000000..745fd99b78 --- /dev/null +++ b/lib/action_cable/connection/internal_channel.rb @@ -0,0 +1,41 @@ +module ActionCable + module Connection + module InternalChannel + extend ActiveSupport::Concern + + def subscribe_to_internal_channel + if connection_identifier.present? + callback = -> (message) { process_internal_message(message) } + @_internal_redis_subscriptions ||= [] + @_internal_redis_subscriptions << [ internal_redis_channel, callback ] + + pubsub.subscribe(internal_redis_channel, &callback) + logger.info "[ActionCable] Registered connection (#{connection_identifier})" + end + end + + def unsubscribe_from_internal_channel + if @_internal_redis_subscriptions.present? + @_internal_redis_subscriptions.each { |channel, callback| pubsub.unsubscribe_proc(channel, callback) } + end + end + + private + def process_internal_message(message) + message = ActiveSupport::JSON.decode(message) + + case message['type'] + when 'disconnect' + logger.info "[ActionCable] Removing connection (#{connection_identifier})" + @websocket.close + end + rescue Exception => e + logger.error "[ActionCable] There was an exception - #{e.class}(#{e.message})" + logger.error e.backtrace.join("\n") + + handle_exception + end + + end + end +end diff --git a/lib/action_cable/connection/registry.rb b/lib/action_cable/connection/registry.rb deleted file mode 100644 index 03a0bf4fe9..0000000000 --- a/lib/action_cable/connection/registry.rb +++ /dev/null @@ -1,64 +0,0 @@ -module ActionCable - module Connection - module Registry - extend ActiveSupport::Concern - - included do - class_attribute :identifiers - self.identifiers = Set.new - end - - module ClassMethods - def identified_by(*identifiers) - self.identifiers += identifiers - end - end - - def register_connection - if connection_identifier.present? - callback = -> (message) { process_registry_message(message) } - @_internal_redis_subscriptions ||= [] - @_internal_redis_subscriptions << [ internal_redis_channel, callback ] - - pubsub.subscribe(internal_redis_channel, &callback) - logger.info "[ActionCable] Registered connection (#{connection_identifier})" - end - end - - def internal_redis_channel - "action_cable/#{connection_identifier}" - end - - def connection_identifier - @connection_identifier ||= connection_gid identifiers.map { |id| instance_variable_get("@#{id}")} - end - - def connection_gid(ids) - ids.map {|o| o.to_global_id.to_s }.sort.join(":") - end - - def cleanup_internal_redis_subscriptions - if @_internal_redis_subscriptions.present? - @_internal_redis_subscriptions.each { |channel, callback| pubsub.unsubscribe_proc(channel, callback) } - end - end - - private - def process_registry_message(message) - message = ActiveSupport::JSON.decode(message) - - case message['type'] - when 'disconnect' - logger.info "[ActionCable] Removing connection (#{connection_identifier})" - @websocket.close - end - rescue Exception => e - logger.error "[ActionCable] There was an exception - #{e.class}(#{e.message})" - logger.error e.backtrace.join("\n") - - handle_exception - end - - end - end -end -- cgit v1.2.3