From 1c9d82dbf0e743534c5fa9be936eaa46c5b07523 Mon Sep 17 00:00:00 2001 From: Pratik Naik Date: Mon, 6 Apr 2015 13:17:02 -0500 Subject: Add remote connection to talk over internal redis channel --- lib/action_cable/connection.rb | 3 ++- lib/action_cable/connection/identifier.rb | 19 ++++++++++++++++ lib/action_cable/connection_proxy.rb | 14 ------------ lib/action_cable/remote_connection.rb | 38 +++++++++++++++++++++++++++++++ lib/action_cable/server.rb | 12 ++++++++-- 5 files changed, 69 insertions(+), 17 deletions(-) create mode 100644 lib/action_cable/connection/identifier.rb delete mode 100644 lib/action_cable/connection_proxy.rb create mode 100644 lib/action_cable/remote_connection.rb (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection.rb b/lib/action_cable/connection.rb index 102903c6ef..91fc73713c 100644 --- a/lib/action_cable/connection.rb +++ b/lib/action_cable/connection.rb @@ -1,6 +1,7 @@ module ActionCable module Connection autoload :Base, 'action_cable/connection/base' - autoload :Registry, 'action_cable/connection/Registry' + autoload :Registry, 'action_cable/connection/registry' + autoload :Identifier, 'action_cable/connection/identifier' end end diff --git a/lib/action_cable/connection/identifier.rb b/lib/action_cable/connection/identifier.rb new file mode 100644 index 0000000000..9bfd773ab1 --- /dev/null +++ b/lib/action_cable/connection/identifier.rb @@ -0,0 +1,19 @@ +module ActionCable + module Connection + module Identifier + + def internal_redis_channel + "action_cable/#{connection_identifier}" + end + + def connection_identifier + @connection_identifier ||= connection_gid identifiers.map { |id| instance_variable_get("@#{id}")} + end + + def connection_gid(ids) + ids.map {|o| o.to_global_id.to_s }.sort.join(":") + end + + end + end +end diff --git a/lib/action_cable/connection_proxy.rb b/lib/action_cable/connection_proxy.rb deleted file mode 100644 index 980e037ff3..0000000000 --- a/lib/action_cable/connection_proxy.rb +++ /dev/null @@ -1,14 +0,0 @@ -module ActionCable - module ConnectionProxy - class << self - def active - end - - def where(identification) - end - end - - def disconnect - end - end -end diff --git a/lib/action_cable/remote_connection.rb b/lib/action_cable/remote_connection.rb new file mode 100644 index 0000000000..e2cb2d932c --- /dev/null +++ b/lib/action_cable/remote_connection.rb @@ -0,0 +1,38 @@ +module ActionCable + class RemoteConnection + class InvalidIdentifiersError < StandardError; end + + include Connection::Identifier + + delegate :redis, to: :server + + def initialize(server, ids) + @server = server + set_identifier_instance_vars(ids) + end + + def disconnect + message = { type: 'disconnect' }.to_json + redis.publish(internal_redis_channel, message) + end + + def identifiers + @server.connection_identifiers + end + + def redis + @redis ||= Redis.new(@server.redis_config) + end + + private + def set_identifier_instance_vars(ids) + raise InvalidIdentifiersError unless valid_identifiers?(ids) + ids.each { |k,v| instance_variable_set("@#{k}", v) } + end + + def valid_identifiers?(ids) + keys = ids.keys + identifiers.all? { |id| keys.include?(id) } + end + end +end diff --git a/lib/action_cable/server.rb b/lib/action_cable/server.rb index 6e9265dc06..51e246c232 100644 --- a/lib/action_cable/server.rb +++ b/lib/action_cable/server.rb @@ -2,12 +2,12 @@ module ActionCable class Server cattr_accessor(:logger, instance_reader: true) { Rails.logger } - attr_accessor :registered_channels, :worker_pool + attr_accessor :registered_channels, :redis_config def initialize(redis_config:, channels:, worker_pool_size: 100, connection: Connection) @redis_config = redis_config @registered_channels = Set.new(channels) - @worker_pool = ActionCable::Worker.pool(size: worker_pool_size) + @worker_pool_size = worker_pool_size @connection_class = connection end @@ -15,9 +15,17 @@ module ActionCable @connection_class.new(self, env).process end + def worker_pool + @worker_pool ||= ActionCable::Worker.pool(size: @worker_pool_size) + end + def pubsub @pubsub ||= EM::Hiredis.connect(@redis_config['url']).pubsub end + def connection_identifiers + @connection_class.identifiers + end + end end -- cgit v1.2.3