From 354018bf9b5f5bf0fbbc6e6efddc719e7523b39d Mon Sep 17 00:00:00 2001 From: Pratik Naik Date: Sat, 4 Apr 2015 00:26:14 -0500 Subject: Separate connection and server classes --- lib/action_cable/connection.rb | 133 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 133 insertions(+) create mode 100644 lib/action_cable/connection.rb (limited to 'lib/action_cable/connection.rb') diff --git a/lib/action_cable/connection.rb b/lib/action_cable/connection.rb new file mode 100644 index 0000000000..00fb8ca817 --- /dev/null +++ b/lib/action_cable/connection.rb @@ -0,0 +1,133 @@ +module ActionCable + class Connection + PING_INTERVAL = 3 + + attr_reader :env, :server + delegate :worker_pool, :pubsub, :logger, to: :server + + def initialize(server, env) + @server = server + @env = env + @accept_messages = false + @pending_messages = [] + end + + def process + if Faye::WebSocket.websocket?(@env) + @subscriptions = {} + + @websocket = Faye::WebSocket.new(@env) + + @websocket.on(:open) do |event| + broadcast_ping_timestamp + @ping_timer = EventMachine.add_periodic_timer(PING_INTERVAL) { broadcast_ping_timestamp } + worker_pool.async.invoke(self, :initialize_client) + end + + @websocket.on(:message) do |event| + message = event.data + + if message.is_a?(String) + if @accept_messages + worker_pool.async.invoke(self, :received_data, message) + else + @pending_messages << message + end + end + end + + @websocket.on(:close) do |event| + worker_pool.async.invoke(self, :cleanup_subscriptions) + worker_pool.async.invoke(self, :disconnect) if respond_to?(:disconnect) + + EventMachine.cancel_timer(@ping_timer) if @ping_timer + end + + @websocket.rack_response + else + invalid_request + end + end + + def received_data(data) + return unless websocket_alive? + + data = ActiveSupport::JSON.decode data + + case data['action'] + when 'subscribe' + subscribe_channel(data) + when 'unsubscribe' + unsubscribe_channel(data) + when 'message' + process_message(data) + end + end + + def cleanup_subscriptions + @subscriptions.each do |id, channel| + channel.unsubscribe + end + end + + def broadcast(data) + logger.info "Sending data: #{data}" + @websocket.send data + end + + def handle_exception + logger.error "[ActionCable] Closing connection" + + @websocket.close + end + + private + def initialize_client + connect if respond_to?(:connect) + @accept_messages = true + + worker_pool.async.invoke(self, :received_data, @pending_messages.shift) until @pending_messages.empty? + end + + def broadcast_ping_timestamp + broadcast({ identifier: '_ping', message: Time.now.to_i }.to_json) + end + + def subscribe_channel(data) + id_key = data['identifier'] + id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access + + subscription_klass = server.registered_channels.detect { |channel_klass| channel_klass.find_name == id_options[:channel] } + + if subscription_klass + logger.info "Subscribing to channel: #{id_key}" + @subscriptions[id_key] = subscription_klass.new(self, id_key, id_options) + else + logger.error "Unable to subscribe to channel: #{id_key}" + end + end + + def process_message(message) + if @subscriptions[message['identifier']] + @subscriptions[message['identifier']].receive_data(ActiveSupport::JSON.decode message['data']) + else + logger.error "Unable to process message: #{message}" + end + end + + def unsubscribe_channel(data) + logger.info "Unsubscribing from channel: #{data['identifier']}" + @subscriptions[data['identifier']].unsubscribe + @subscriptions.delete(data['identifier']) + end + + def invalid_request + [404, {'Content-Type' => 'text/plain'}, ['Page not found']] + end + + def websocket_alive? + @websocket && @websocket.ready_state == Faye::WebSocket::API::OPEN + end + + end +end -- cgit v1.2.3 From eec92d0229a7bceb62d49d58c70b5629fe140d7f Mon Sep 17 00:00:00 2001 From: Pratik Naik Date: Mon, 6 Apr 2015 12:21:22 -0500 Subject: Add connection identifier and an internal redis channel --- lib/action_cable/connection.rb | 133 +---------------------------------------- 1 file changed, 3 insertions(+), 130 deletions(-) (limited to 'lib/action_cable/connection.rb') diff --git a/lib/action_cable/connection.rb b/lib/action_cable/connection.rb index 00fb8ca817..102903c6ef 100644 --- a/lib/action_cable/connection.rb +++ b/lib/action_cable/connection.rb @@ -1,133 +1,6 @@ module ActionCable - class Connection - PING_INTERVAL = 3 - - attr_reader :env, :server - delegate :worker_pool, :pubsub, :logger, to: :server - - def initialize(server, env) - @server = server - @env = env - @accept_messages = false - @pending_messages = [] - end - - def process - if Faye::WebSocket.websocket?(@env) - @subscriptions = {} - - @websocket = Faye::WebSocket.new(@env) - - @websocket.on(:open) do |event| - broadcast_ping_timestamp - @ping_timer = EventMachine.add_periodic_timer(PING_INTERVAL) { broadcast_ping_timestamp } - worker_pool.async.invoke(self, :initialize_client) - end - - @websocket.on(:message) do |event| - message = event.data - - if message.is_a?(String) - if @accept_messages - worker_pool.async.invoke(self, :received_data, message) - else - @pending_messages << message - end - end - end - - @websocket.on(:close) do |event| - worker_pool.async.invoke(self, :cleanup_subscriptions) - worker_pool.async.invoke(self, :disconnect) if respond_to?(:disconnect) - - EventMachine.cancel_timer(@ping_timer) if @ping_timer - end - - @websocket.rack_response - else - invalid_request - end - end - - def received_data(data) - return unless websocket_alive? - - data = ActiveSupport::JSON.decode data - - case data['action'] - when 'subscribe' - subscribe_channel(data) - when 'unsubscribe' - unsubscribe_channel(data) - when 'message' - process_message(data) - end - end - - def cleanup_subscriptions - @subscriptions.each do |id, channel| - channel.unsubscribe - end - end - - def broadcast(data) - logger.info "Sending data: #{data}" - @websocket.send data - end - - def handle_exception - logger.error "[ActionCable] Closing connection" - - @websocket.close - end - - private - def initialize_client - connect if respond_to?(:connect) - @accept_messages = true - - worker_pool.async.invoke(self, :received_data, @pending_messages.shift) until @pending_messages.empty? - end - - def broadcast_ping_timestamp - broadcast({ identifier: '_ping', message: Time.now.to_i }.to_json) - end - - def subscribe_channel(data) - id_key = data['identifier'] - id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access - - subscription_klass = server.registered_channels.detect { |channel_klass| channel_klass.find_name == id_options[:channel] } - - if subscription_klass - logger.info "Subscribing to channel: #{id_key}" - @subscriptions[id_key] = subscription_klass.new(self, id_key, id_options) - else - logger.error "Unable to subscribe to channel: #{id_key}" - end - end - - def process_message(message) - if @subscriptions[message['identifier']] - @subscriptions[message['identifier']].receive_data(ActiveSupport::JSON.decode message['data']) - else - logger.error "Unable to process message: #{message}" - end - end - - def unsubscribe_channel(data) - logger.info "Unsubscribing from channel: #{data['identifier']}" - @subscriptions[data['identifier']].unsubscribe - @subscriptions.delete(data['identifier']) - end - - def invalid_request - [404, {'Content-Type' => 'text/plain'}, ['Page not found']] - end - - def websocket_alive? - @websocket && @websocket.ready_state == Faye::WebSocket::API::OPEN - end - + module Connection + autoload :Base, 'action_cable/connection/base' + autoload :Registry, 'action_cable/connection/Registry' end end -- cgit v1.2.3 From 1c9d82dbf0e743534c5fa9be936eaa46c5b07523 Mon Sep 17 00:00:00 2001 From: Pratik Naik Date: Mon, 6 Apr 2015 13:17:02 -0500 Subject: Add remote connection to talk over internal redis channel --- lib/action_cable/connection.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'lib/action_cable/connection.rb') diff --git a/lib/action_cable/connection.rb b/lib/action_cable/connection.rb index 102903c6ef..91fc73713c 100644 --- a/lib/action_cable/connection.rb +++ b/lib/action_cable/connection.rb @@ -1,6 +1,7 @@ module ActionCable module Connection autoload :Base, 'action_cable/connection/base' - autoload :Registry, 'action_cable/connection/Registry' + autoload :Registry, 'action_cable/connection/registry' + autoload :Identifier, 'action_cable/connection/identifier' end end -- cgit v1.2.3