aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPratik Naik <pratiknaik@gmail.com>2015-04-07 10:11:46 -0500
committerPratik Naik <pratiknaik@gmail.com>2015-04-07 10:11:46 -0500
commit6127d0cda4d60c7b30ee1fb40006da11e04512d0 (patch)
treec6905ea6b5414a64a687dea12cb0ea4631d2e0a1
parentbdbbe18f3cc527b121bbb2f402898caf4c2fbb15 (diff)
parentfb797ad1f1c3b0d96968c5feef783a2b8fe07eed (diff)
downloadrails-6127d0cda4d60c7b30ee1fb40006da11e04512d0.tar.gz
rails-6127d0cda4d60c7b30ee1fb40006da11e04512d0.tar.bz2
rails-6127d0cda4d60c7b30ee1fb40006da11e04512d0.zip
Merge branch 'connection-management'
-rw-r--r--Gemfile.lock9
-rw-r--r--action_cable.gemspec2
-rw-r--r--lib/action_cable.rb5
-rw-r--r--lib/action_cable/channel/redis.rb7
-rw-r--r--lib/action_cable/connection.rb7
-rw-r--r--lib/action_cable/connection/base.rb145
-rw-r--r--lib/action_cable/connection/identifier.rb19
-rw-r--r--lib/action_cable/connection/registry.rb64
-rw-r--r--lib/action_cable/remote_connection.rb38
-rw-r--r--lib/action_cable/remote_connections.rb13
-rw-r--r--lib/action_cable/server.rb154
11 files changed, 317 insertions, 146 deletions
diff --git a/Gemfile.lock b/Gemfile.lock
index e767e58784..6fcf4aa39f 100644
--- a/Gemfile.lock
+++ b/Gemfile.lock
@@ -4,6 +4,7 @@ PATH
action_cable (0.0.2)
activesupport (>= 4.2.0)
celluloid (~> 0.16.0)
+ em-hiredis (~> 0.3.0)
faye-websocket (~> 0.9.2)
GEM
@@ -17,10 +18,14 @@ GEM
tzinfo (~> 1.1)
celluloid (0.16.0)
timers (~> 4.0.0)
+ em-hiredis (0.3.0)
+ eventmachine (~> 1.0)
+ hiredis (~> 0.5.0)
eventmachine (1.0.7)
faye-websocket (0.9.2)
eventmachine (>= 0.12.0)
websocket-driver (>= 0.5.1)
+ hiredis (0.5.2)
hitimes (1.2.2)
i18n (0.7.0)
json (1.8.2)
@@ -34,9 +39,9 @@ GEM
hitimes
tzinfo (1.2.2)
thread_safe (~> 0.1)
- websocket-driver (0.5.1)
+ websocket-driver (0.5.3)
websocket-extensions (>= 0.1.0)
- websocket-extensions (0.1.1)
+ websocket-extensions (0.1.2)
PLATFORMS
ruby
diff --git a/action_cable.gemspec b/action_cable.gemspec
index 1dade2a394..4f252d4b1e 100644
--- a/action_cable.gemspec
+++ b/action_cable.gemspec
@@ -12,6 +12,8 @@ Gem::Specification.new do |s|
s.add_dependency('activesupport', '>= 4.2.0')
s.add_dependency('faye-websocket', '~> 0.9.2')
s.add_dependency('celluloid', '~> 0.16.0')
+ s.add_dependency('em-hiredis', '~> 0.3.0')
+ s.add_dependency('redis', '~> 3.0')
s.files = Dir['README', 'lib/**/*']
s.has_rdoc = false
diff --git a/lib/action_cable.rb b/lib/action_cable.rb
index 0681b8bdde..3352453491 100644
--- a/lib/action_cable.rb
+++ b/lib/action_cable.rb
@@ -7,10 +7,12 @@ require 'active_support'
require 'active_support/json'
require 'active_support/concern'
require 'active_support/core_ext/hash/indifferent_access'
+require 'active_support/core_ext/module/delegation'
require 'active_support/callbacks'
require 'faye/websocket'
require 'celluloid'
+require 'em-hiredis'
require 'action_cable/engine' if defined?(Rails)
@@ -20,4 +22,7 @@ module ActionCable
autoload :Channel, 'action_cable/channel'
autoload :Worker, 'action_cable/worker'
autoload :Server, 'action_cable/server'
+ autoload :Connection, 'action_cable/connection'
+ autoload :RemoteConnection, 'action_cable/remote_connection'
+ autoload :RemoteConnections, 'action_cable/remote_connections'
end
diff --git a/lib/action_cable/channel/redis.rb b/lib/action_cable/channel/redis.rb
index bdbd3c95b1..2691a3b145 100644
--- a/lib/action_cable/channel/redis.rb
+++ b/lib/action_cable/channel/redis.rb
@@ -6,11 +6,10 @@ module ActionCable
included do
on_unsubscribe :unsubscribe_from_redis_channels
+ delegate :pubsub, to: :connection
end
def subscribe_to(redis_channel, callback = nil)
- raise "`ActionCable::Server.pubsub` class method is not defined" unless connection.class.respond_to?(:pubsub)
-
callback ||= -> (message) { broadcast ActiveSupport::JSON.decode(message) }
@_redis_channels ||= []
@_redis_channels << [ redis_channel, callback ]
@@ -24,10 +23,6 @@ module ActionCable
@_redis_channels.each { |channel, callback| pubsub.unsubscribe_proc(channel, callback) }
end
end
-
- def pubsub
- connection.class.pubsub
- end
end
end
diff --git a/lib/action_cable/connection.rb b/lib/action_cable/connection.rb
new file mode 100644
index 0000000000..91fc73713c
--- /dev/null
+++ b/lib/action_cable/connection.rb
@@ -0,0 +1,7 @@
+module ActionCable
+ module Connection
+ autoload :Base, 'action_cable/connection/base'
+ autoload :Registry, 'action_cable/connection/registry'
+ autoload :Identifier, 'action_cable/connection/identifier'
+ end
+end
diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb
new file mode 100644
index 0000000000..4ad1e7d065
--- /dev/null
+++ b/lib/action_cable/connection/base.rb
@@ -0,0 +1,145 @@
+module ActionCable
+ module Connection
+ class Base
+ include Registry
+
+ PING_INTERVAL = 3
+
+ attr_reader :env, :server
+ delegate :worker_pool, :pubsub, :logger, to: :server
+
+ def initialize(server, env)
+ @server = server
+ @env = env
+ @accept_messages = false
+ @pending_messages = []
+ end
+
+ def process
+ if Faye::WebSocket.websocket?(@env)
+ @subscriptions = {}
+
+ @websocket = Faye::WebSocket.new(@env)
+
+ @websocket.on(:open) do |event|
+ broadcast_ping_timestamp
+ @ping_timer = EventMachine.add_periodic_timer(PING_INTERVAL) { broadcast_ping_timestamp }
+ worker_pool.async.invoke(self, :initialize_client)
+ end
+
+ @websocket.on(:message) do |event|
+ message = event.data
+
+ if message.is_a?(String)
+ if @accept_messages
+ worker_pool.async.invoke(self, :received_data, message)
+ else
+ @pending_messages << message
+ end
+ end
+ end
+
+ @websocket.on(:close) do |event|
+ worker_pool.async.invoke(self, :cleanup_subscriptions)
+ worker_pool.async.invoke(self, :cleanup_internal_redis_subscriptions)
+ worker_pool.async.invoke(self, :disconnect) if respond_to?(:disconnect)
+
+ EventMachine.cancel_timer(@ping_timer) if @ping_timer
+ end
+
+ @websocket.rack_response
+ else
+ invalid_request
+ end
+ end
+
+ def received_data(data)
+ return unless websocket_alive?
+
+ data = ActiveSupport::JSON.decode data
+
+ case data['action']
+ when 'subscribe'
+ subscribe_channel(data)
+ when 'unsubscribe'
+ unsubscribe_channel(data)
+ when 'message'
+ process_message(data)
+ end
+ end
+
+ def cleanup_subscriptions
+ @subscriptions.each do |id, channel|
+ channel.unsubscribe
+ end
+ end
+
+ def broadcast(data)
+ logger.info "Sending data: #{data}"
+ @websocket.send data
+ end
+
+ def handle_exception
+ logger.error "[ActionCable] Closing connection"
+
+ @websocket.close
+ end
+
+ private
+ def initialize_client
+ connect if respond_to?(:connect)
+ register_connection
+
+ @accept_messages = true
+ worker_pool.async.invoke(self, :received_data, @pending_messages.shift) until @pending_messages.empty?
+ end
+
+ def broadcast_ping_timestamp
+ broadcast({ identifier: '_ping', message: Time.now.to_i }.to_json)
+ end
+
+ def subscribe_channel(data)
+ id_key = data['identifier']
+ id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access
+
+ subscription_klass = server.registered_channels.detect { |channel_klass| channel_klass.find_name == id_options[:channel] }
+
+ if subscription_klass
+ logger.info "[ActionCable] Subscribing to channel: #{id_key}"
+ @subscriptions[id_key] = subscription_klass.new(self, id_key, id_options)
+ else
+ logger.error "[ActionCable] Subscription class not found (#{data.inspect})"
+ end
+ rescue Exception => e
+ logger.error "[ActionCable] Could not subscribe to channel (#{data.inspect})"
+ logger.error e.backtrace.join("\n")
+ end
+
+ def process_message(message)
+ if @subscriptions[message['identifier']]
+ @subscriptions[message['identifier']].receive_data(ActiveSupport::JSON.decode message['data'])
+ else
+ logger.error "[ActionCable] Unable to process message because no subscription was found (#{message.inspect})"
+ end
+ rescue Exception => e
+ logger.error "[ActionCable] Could not process message (#{data.inspect})"
+ logger.error e.backtrace.join("\n")
+ end
+
+ def unsubscribe_channel(data)
+ logger.info "[ActionCable] Unsubscribing from channel: #{data['identifier']}"
+ @subscriptions[data['identifier']].unsubscribe
+ @subscriptions.delete(data['identifier'])
+ end
+
+ def invalid_request
+ [404, {'Content-Type' => 'text/plain'}, ['Page not found']]
+ end
+
+ def websocket_alive?
+ @websocket && @websocket.ready_state == Faye::WebSocket::API::OPEN
+ end
+
+ end
+ end
+end
diff --git a/lib/action_cable/connection/identifier.rb b/lib/action_cable/connection/identifier.rb
new file mode 100644
index 0000000000..9bfd773ab1
--- /dev/null
+++ b/lib/action_cable/connection/identifier.rb
@@ -0,0 +1,19 @@
+module ActionCable
+ module Connection
+ module Identifier
+
+ def internal_redis_channel
+ "action_cable/#{connection_identifier}"
+ end
+
+ def connection_identifier
+ @connection_identifier ||= connection_gid identifiers.map { |id| instance_variable_get("@#{id}")}
+ end
+
+ def connection_gid(ids)
+ ids.map {|o| o.to_global_id.to_s }.sort.join(":")
+ end
+
+ end
+ end
+end
diff --git a/lib/action_cable/connection/registry.rb b/lib/action_cable/connection/registry.rb
new file mode 100644
index 0000000000..03a0bf4fe9
--- /dev/null
+++ b/lib/action_cable/connection/registry.rb
@@ -0,0 +1,64 @@
+module ActionCable
+ module Connection
+ module Registry
+ extend ActiveSupport::Concern
+
+ included do
+ class_attribute :identifiers
+ self.identifiers = Set.new
+ end
+
+ module ClassMethods
+ def identified_by(*identifiers)
+ self.identifiers += identifiers
+ end
+ end
+
+ def register_connection
+ if connection_identifier.present?
+ callback = -> (message) { process_registry_message(message) }
+ @_internal_redis_subscriptions ||= []
+ @_internal_redis_subscriptions << [ internal_redis_channel, callback ]
+
+ pubsub.subscribe(internal_redis_channel, &callback)
+ logger.info "[ActionCable] Registered connection (#{connection_identifier})"
+ end
+ end
+
+ def internal_redis_channel
+ "action_cable/#{connection_identifier}"
+ end
+
+ def connection_identifier
+ @connection_identifier ||= connection_gid identifiers.map { |id| instance_variable_get("@#{id}")}
+ end
+
+ def connection_gid(ids)
+ ids.map {|o| o.to_global_id.to_s }.sort.join(":")
+ end
+
+ def cleanup_internal_redis_subscriptions
+ if @_internal_redis_subscriptions.present?
+ @_internal_redis_subscriptions.each { |channel, callback| pubsub.unsubscribe_proc(channel, callback) }
+ end
+ end
+
+ private
+ def process_registry_message(message)
+ message = ActiveSupport::JSON.decode(message)
+
+ case message['type']
+ when 'disconnect'
+ logger.info "[ActionCable] Removing connection (#{connection_identifier})"
+ @websocket.close
+ end
+ rescue Exception => e
+ logger.error "[ActionCable] There was an exception - #{e.class}(#{e.message})"
+ logger.error e.backtrace.join("\n")
+
+ handle_exception
+ end
+
+ end
+ end
+end
diff --git a/lib/action_cable/remote_connection.rb b/lib/action_cable/remote_connection.rb
new file mode 100644
index 0000000000..e2cb2d932c
--- /dev/null
+++ b/lib/action_cable/remote_connection.rb
@@ -0,0 +1,38 @@
+module ActionCable
+ class RemoteConnection
+ class InvalidIdentifiersError < StandardError; end
+
+ include Connection::Identifier
+
+ delegate :redis, to: :server
+
+ def initialize(server, ids)
+ @server = server
+ set_identifier_instance_vars(ids)
+ end
+
+ def disconnect
+ message = { type: 'disconnect' }.to_json
+ redis.publish(internal_redis_channel, message)
+ end
+
+ def identifiers
+ @server.connection_identifiers
+ end
+
+ def redis
+ @redis ||= Redis.new(@server.redis_config)
+ end
+
+ private
+ def set_identifier_instance_vars(ids)
+ raise InvalidIdentifiersError unless valid_identifiers?(ids)
+ ids.each { |k,v| instance_variable_set("@#{k}", v) }
+ end
+
+ def valid_identifiers?(ids)
+ keys = ids.keys
+ identifiers.all? { |id| keys.include?(id) }
+ end
+ end
+end
diff --git a/lib/action_cable/remote_connections.rb b/lib/action_cable/remote_connections.rb
new file mode 100644
index 0000000000..f9d7c49a27
--- /dev/null
+++ b/lib/action_cable/remote_connections.rb
@@ -0,0 +1,13 @@
+module ActionCable
+ class RemoteConnections
+ attr_reader :server
+
+ def initialize(server)
+ @server = server
+ end
+
+ def where(identifier)
+ RemoteConnection.new(server, identifier)
+ end
+ end
+end
diff --git a/lib/action_cable/server.rb b/lib/action_cable/server.rb
index ebf98171c1..222c77fd51 100644
--- a/lib/action_cable/server.rb
+++ b/lib/action_cable/server.rb
@@ -1,157 +1,35 @@
module ActionCable
class Server
- class_attribute :registered_channels
- self.registered_channels = Set.new
-
- class_attribute :worker_pool_size
- self.worker_pool_size = 100
-
cattr_accessor(:logger, instance_reader: true) { Rails.logger }
- PING_INTERVAL = 3
-
- class << self
- def register_channels(*channel_classes)
- self.registered_channels += channel_classes
- end
-
- def call(env)
- new(env).process
- end
+ attr_accessor :registered_channels, :redis_config
- def worker_pool
- @worker_pool ||= ActionCable::Worker.pool(size: worker_pool_size)
- end
+ def initialize(redis_config:, channels:, worker_pool_size: 100, connection: Connection)
+ @redis_config = redis_config
+ @registered_channels = Set.new(channels)
+ @worker_pool_size = worker_pool_size
+ @connection_class = connection
end
- attr_reader :env
-
- def initialize(env)
- @env = env
- @accept_messages = false
- @pending_messages = []
- end
-
- def process
- if Faye::WebSocket.websocket?(@env)
- @subscriptions = {}
-
- @websocket = Faye::WebSocket.new(@env)
-
- @websocket.on(:open) do |event|
- broadcast_ping_timestamp
- @ping_timer = EventMachine.add_periodic_timer(PING_INTERVAL) { broadcast_ping_timestamp }
- worker_pool.async.invoke(self, :initialize_client)
- end
-
- @websocket.on(:message) do |event|
- message = event.data
-
- if message.is_a?(String)
- if @accept_messages
- worker_pool.async.invoke(self, :received_data, message)
- else
- @pending_messages << message
- end
- end
- end
-
- @websocket.on(:close) do |event|
- worker_pool.async.invoke(self, :cleanup_subscriptions)
- worker_pool.async.invoke(self, :disconnect) if respond_to?(:disconnect)
-
- EventMachine.cancel_timer(@ping_timer) if @ping_timer
- end
-
- @websocket.rack_response
- else
- invalid_request
- end
+ def call(env)
+ @connection_class.new(self, env).process
end
- def received_data(data)
- return unless websocket_alive?
-
- data = ActiveSupport::JSON.decode data
-
- case data['action']
- when 'subscribe'
- subscribe_channel(data)
- when 'unsubscribe'
- unsubscribe_channel(data)
- when 'message'
- process_message(data)
- end
- end
-
- def cleanup_subscriptions
- @subscriptions.each do |id, channel|
- channel.unsubscribe
- end
+ def worker_pool
+ @worker_pool ||= ActionCable::Worker.pool(size: @worker_pool_size)
end
- def broadcast(data)
- logger.info "Sending data: #{data}"
- @websocket.send data
+ def pubsub
+ @pubsub ||= EM::Hiredis.connect(@redis_config['url']).pubsub
end
- def worker_pool
- self.class.worker_pool
+ def remote_connections
+ @remote_connections ||= RemoteConnections.new(self)
end
- def handle_exception
- logger.error "[ActionCable] Closing connection"
-
- @websocket.close
+ def connection_identifiers
+ @connection_class.identifiers
end
- private
- def initialize_client
- connect if respond_to?(:connect)
- @accept_messages = true
-
- worker_pool.async.invoke(self, :received_data, @pending_messages.shift) until @pending_messages.empty?
- end
-
- def broadcast_ping_timestamp
- broadcast({ identifier: '_ping', message: Time.now.to_i }.to_json)
- end
-
- def subscribe_channel(data)
- id_key = data['identifier']
- id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access
-
- subscription_klass = registered_channels.detect { |channel_klass| channel_klass.find_name == id_options[:channel] }
-
- if subscription_klass
- logger.info "Subscribing to channel: #{id_key}"
- @subscriptions[id_key] = subscription_klass.new(self, id_key, id_options)
- else
- logger.error "Unable to subscribe to channel: #{id_key}"
- end
- end
-
- def process_message(message)
- if @subscriptions[message['identifier']]
- @subscriptions[message['identifier']].receive_data(ActiveSupport::JSON.decode message['data'])
- else
- logger.error "Unable to process message: #{message}"
- end
- end
-
- def unsubscribe_channel(data)
- logger.info "Unsubscribing from channel: #{data['identifier']}"
- @subscriptions[data['identifier']].unsubscribe
- @subscriptions.delete(data['identifier'])
- end
-
- def invalid_request
- [404, {'Content-Type' => 'text/plain'}, ['Page not found']]
- end
-
- def websocket_alive?
- @websocket && @websocket.ready_state == Faye::WebSocket::API::OPEN
- end
-
end
end