diff options
author | Pratik Naik <pratiknaik@gmail.com> | 2015-04-07 10:11:46 -0500 |
---|---|---|
committer | Pratik Naik <pratiknaik@gmail.com> | 2015-04-07 10:11:46 -0500 |
commit | 6127d0cda4d60c7b30ee1fb40006da11e04512d0 (patch) | |
tree | c6905ea6b5414a64a687dea12cb0ea4631d2e0a1 /lib/action_cable/connection | |
parent | bdbbe18f3cc527b121bbb2f402898caf4c2fbb15 (diff) | |
parent | fb797ad1f1c3b0d96968c5feef783a2b8fe07eed (diff) | |
download | rails-6127d0cda4d60c7b30ee1fb40006da11e04512d0.tar.gz rails-6127d0cda4d60c7b30ee1fb40006da11e04512d0.tar.bz2 rails-6127d0cda4d60c7b30ee1fb40006da11e04512d0.zip |
Merge branch 'connection-management'
Diffstat (limited to 'lib/action_cable/connection')
-rw-r--r-- | lib/action_cable/connection/base.rb | 145 | ||||
-rw-r--r-- | lib/action_cable/connection/identifier.rb | 19 | ||||
-rw-r--r-- | lib/action_cable/connection/registry.rb | 64 |
3 files changed, 228 insertions, 0 deletions
diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb new file mode 100644 index 0000000000..4ad1e7d065 --- /dev/null +++ b/lib/action_cable/connection/base.rb @@ -0,0 +1,145 @@ +module ActionCable + module Connection + class Base + include Registry + + PING_INTERVAL = 3 + + attr_reader :env, :server + delegate :worker_pool, :pubsub, :logger, to: :server + + def initialize(server, env) + @server = server + @env = env + @accept_messages = false + @pending_messages = [] + end + + def process + if Faye::WebSocket.websocket?(@env) + @subscriptions = {} + + @websocket = Faye::WebSocket.new(@env) + + @websocket.on(:open) do |event| + broadcast_ping_timestamp + @ping_timer = EventMachine.add_periodic_timer(PING_INTERVAL) { broadcast_ping_timestamp } + worker_pool.async.invoke(self, :initialize_client) + end + + @websocket.on(:message) do |event| + message = event.data + + if message.is_a?(String) + if @accept_messages + worker_pool.async.invoke(self, :received_data, message) + else + @pending_messages << message + end + end + end + + @websocket.on(:close) do |event| + worker_pool.async.invoke(self, :cleanup_subscriptions) + worker_pool.async.invoke(self, :cleanup_internal_redis_subscriptions) + worker_pool.async.invoke(self, :disconnect) if respond_to?(:disconnect) + + EventMachine.cancel_timer(@ping_timer) if @ping_timer + end + + @websocket.rack_response + else + invalid_request + end + end + + def received_data(data) + return unless websocket_alive? + + data = ActiveSupport::JSON.decode data + + case data['action'] + when 'subscribe' + subscribe_channel(data) + when 'unsubscribe' + unsubscribe_channel(data) + when 'message' + process_message(data) + end + end + + def cleanup_subscriptions + @subscriptions.each do |id, channel| + channel.unsubscribe + end + end + + def broadcast(data) + logger.info "Sending data: #{data}" + @websocket.send data + end + + def handle_exception + logger.error "[ActionCable] Closing connection" + + @websocket.close + end + + private + def initialize_client + connect if respond_to?(:connect) + register_connection + + @accept_messages = true + worker_pool.async.invoke(self, :received_data, @pending_messages.shift) until @pending_messages.empty? + end + + def broadcast_ping_timestamp + broadcast({ identifier: '_ping', message: Time.now.to_i }.to_json) + end + + def subscribe_channel(data) + id_key = data['identifier'] + id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access + + subscription_klass = server.registered_channels.detect { |channel_klass| channel_klass.find_name == id_options[:channel] } + + if subscription_klass + logger.info "[ActionCable] Subscribing to channel: #{id_key}" + @subscriptions[id_key] = subscription_klass.new(self, id_key, id_options) + else + logger.error "[ActionCable] Subscription class not found (#{data.inspect})" + end + rescue Exception => e + logger.error "[ActionCable] Could not subscribe to channel (#{data.inspect})" + logger.error e.backtrace.join("\n") + end + + def process_message(message) + if @subscriptions[message['identifier']] + @subscriptions[message['identifier']].receive_data(ActiveSupport::JSON.decode message['data']) + else + logger.error "[ActionCable] Unable to process message because no subscription was found (#{message.inspect})" + end + rescue Exception => e + logger.error "[ActionCable] Could not process message (#{data.inspect})" + logger.error e.backtrace.join("\n") + end + + def unsubscribe_channel(data) + logger.info "[ActionCable] Unsubscribing from channel: #{data['identifier']}" + @subscriptions[data['identifier']].unsubscribe + @subscriptions.delete(data['identifier']) + end + + def invalid_request + [404, {'Content-Type' => 'text/plain'}, ['Page not found']] + end + + def websocket_alive? + @websocket && @websocket.ready_state == Faye::WebSocket::API::OPEN + end + + end + end +end diff --git a/lib/action_cable/connection/identifier.rb b/lib/action_cable/connection/identifier.rb new file mode 100644 index 0000000000..9bfd773ab1 --- /dev/null +++ b/lib/action_cable/connection/identifier.rb @@ -0,0 +1,19 @@ +module ActionCable + module Connection + module Identifier + + def internal_redis_channel + "action_cable/#{connection_identifier}" + end + + def connection_identifier + @connection_identifier ||= connection_gid identifiers.map { |id| instance_variable_get("@#{id}")} + end + + def connection_gid(ids) + ids.map {|o| o.to_global_id.to_s }.sort.join(":") + end + + end + end +end diff --git a/lib/action_cable/connection/registry.rb b/lib/action_cable/connection/registry.rb new file mode 100644 index 0000000000..03a0bf4fe9 --- /dev/null +++ b/lib/action_cable/connection/registry.rb @@ -0,0 +1,64 @@ +module ActionCable + module Connection + module Registry + extend ActiveSupport::Concern + + included do + class_attribute :identifiers + self.identifiers = Set.new + end + + module ClassMethods + def identified_by(*identifiers) + self.identifiers += identifiers + end + end + + def register_connection + if connection_identifier.present? + callback = -> (message) { process_registry_message(message) } + @_internal_redis_subscriptions ||= [] + @_internal_redis_subscriptions << [ internal_redis_channel, callback ] + + pubsub.subscribe(internal_redis_channel, &callback) + logger.info "[ActionCable] Registered connection (#{connection_identifier})" + end + end + + def internal_redis_channel + "action_cable/#{connection_identifier}" + end + + def connection_identifier + @connection_identifier ||= connection_gid identifiers.map { |id| instance_variable_get("@#{id}")} + end + + def connection_gid(ids) + ids.map {|o| o.to_global_id.to_s }.sort.join(":") + end + + def cleanup_internal_redis_subscriptions + if @_internal_redis_subscriptions.present? + @_internal_redis_subscriptions.each { |channel, callback| pubsub.unsubscribe_proc(channel, callback) } + end + end + + private + def process_registry_message(message) + message = ActiveSupport::JSON.decode(message) + + case message['type'] + when 'disconnect' + logger.info "[ActionCable] Removing connection (#{connection_identifier})" + @websocket.close + end + rescue Exception => e + logger.error "[ActionCable] There was an exception - #{e.class}(#{e.message})" + logger.error e.backtrace.join("\n") + + handle_exception + end + + end + end +end |