diff options
author | David Heinemeier Hansson <david@loudthinking.com> | 2015-07-04 21:30:48 +0200 |
---|---|---|
committer | David Heinemeier Hansson <david@loudthinking.com> | 2015-07-04 21:30:48 +0200 |
commit | 0de65cf2d8860377d45020a83866073c5fec188c (patch) | |
tree | d39b559162936fd79049595d69098fd7712a8fe3 /lib/action_cable/connection | |
parent | 9886a995f5f0b32d0d400074c48221cb0f6b911e (diff) | |
parent | 5de01033150b70982f23a42670c55348a7371c4b (diff) | |
download | rails-0de65cf2d8860377d45020a83866073c5fec188c.tar.gz rails-0de65cf2d8860377d45020a83866073c5fec188c.tar.bz2 rails-0de65cf2d8860377d45020a83866073c5fec188c.zip |
Merge branch 'master' of github.com:basecamp/action_cable
Diffstat (limited to 'lib/action_cable/connection')
-rw-r--r-- | lib/action_cable/connection/base.rb | 196 | ||||
-rw-r--r-- | lib/action_cable/connection/heartbeat.rb | 27 | ||||
-rw-r--r-- | lib/action_cable/connection/identification.rb | 26 | ||||
-rw-r--r-- | lib/action_cable/connection/identifier.rb | 17 | ||||
-rw-r--r-- | lib/action_cable/connection/internal_channel.rb | 8 | ||||
-rw-r--r-- | lib/action_cable/connection/message_buffer.rb | 51 | ||||
-rw-r--r-- | lib/action_cable/connection/subscriptions.rb | 69 | ||||
-rw-r--r-- | lib/action_cable/connection/tagged_logger_proxy.rb | 4 | ||||
-rw-r--r-- | lib/action_cable/connection/web_socket.rb | 27 |
9 files changed, 270 insertions, 155 deletions
diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index 89d0844031..69c0db9167 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -1,113 +1,68 @@ module ActionCable module Connection class Base - include InternalChannel, Identifier + include Identification + include InternalChannel - PING_INTERVAL = 3 - - class_attribute :identifiers - self.identifiers = Set.new - - def self.identified_by(*identifiers) - self.identifiers += identifiers - end - - attr_reader :env, :server, :logger + attr_reader :server, :env delegate :worker_pool, :pubsub, to: :server + attr_reader :logger + def initialize(server, env) - @started_at = Time.now + @server, @env = server, env - @server = server - @env = env - @accept_messages = false - @pending_messages = [] - @subscriptions = {} + @logger = new_tagged_logger - @logger = TaggedLoggerProxy.new(server.logger, tags: log_tags) + @websocket = ActionCable::Connection::WebSocket.new(env) + @heartbeat = ActionCable::Connection::Heartbeat.new(self) + @subscriptions = ActionCable::Connection::Subscriptions.new(self) + @message_buffer = ActionCable::Connection::MessageBuffer.new(self) + + @started_at = Time.now end def process logger.info started_request_message - if websocket? - @websocket = Faye::WebSocket.new(@env) - - @websocket.on(:open) do |event| - transmit_ping_timestamp - @ping_timer = EventMachine.add_periodic_timer(PING_INTERVAL) { transmit_ping_timestamp } - worker_pool.async.invoke(self, :initialize_connection) - end - - @websocket.on(:message) do |event| - message = event.data - - if message.is_a?(String) - if @accept_messages - worker_pool.async.invoke(self, :received_data, message) - else - @pending_messages << message - end - end - end - - @websocket.on(:close) do |event| - logger.info finished_request_message - - worker_pool.async.invoke(self, :on_connection_closed) - EventMachine.cancel_timer(@ping_timer) if @ping_timer - end - - @websocket.rack_response + if websocket.possible? + websocket.on(:open) { |event| send_async :on_open } + websocket.on(:message) { |event| on_message event.data } + websocket.on(:close) { |event| send_async :on_close } + + respond_to_successful_request else - invalid_request + respond_to_invalid_request end end - def received_data(data) - return unless websocket_alive? - - data = ActiveSupport::JSON.decode data - - case data['command'] - when 'subscribe' - subscribe_channel(data) - when 'unsubscribe' - unsubscribe_channel(data) - when 'message' - process_message(data) + def receive(data_in_json) + if websocket.alive? + subscriptions.execute_command ActiveSupport::JSON.decode(data_in_json) else - logger.error "Received unrecognized command in #{data.inspect}" - end - end - - def cleanup_subscriptions - @subscriptions.each do |id, channel| - channel.perform_disconnection + logger.error "Received data without a live websocket (#{data.inspect})" end end def transmit(data) - @websocket.send data + websocket.transmit data end - def statistics - { - identifier: connection_identifier, - started_at: @started_at, - subscriptions: @subscriptions.keys - } + def close + logger.error "Closing connection" + websocket.close end - def handle_exception - close_connection + + def send_async(method, *arguments) + worker_pool.async.invoke(self, method, *arguments) end - def close_connection - logger.error "Closing connection" - @websocket.close + def statistics + { identifier: connection_identifier, started_at: @started_at, subscriptions: subscriptions.identifiers } end + protected def request @request ||= ActionDispatch::Request.new(Rails.application.env_config.merge(env)) @@ -117,79 +72,59 @@ module ActionCable request.cookie_jar end - def initialize_connection + + private + attr_reader :websocket + attr_reader :heartbeat, :subscriptions, :message_buffer + + def on_open server.add_connection(self) connect if respond_to?(:connect) subscribe_to_internal_channel + heartbeat.start - @accept_messages = true - worker_pool.async.invoke(self, :received_data, @pending_messages.shift) until @pending_messages.empty? + message_buffer.process! end - def on_connection_closed - server.remove_connection(self) - - cleanup_subscriptions - unsubscribe_from_internal_channel - disconnect if respond_to?(:disconnect) + def on_message(message) + message_buffer.append message end - def transmit_ping_timestamp - transmit({ identifier: '_ping', message: Time.now.to_i }.to_json) - end + def on_close + logger.info finished_request_message - def subscribe_channel(data) - id_key = data['identifier'] - id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access + server.remove_connection(self) - subscription_klass = server.registered_channels.detect { |channel_klass| channel_klass.find_name == id_options[:channel] } + subscriptions.cleanup + unsubscribe_from_internal_channel + heartbeat.stop - if subscription_klass - @subscriptions[id_key] = subscription_klass.new(self, id_key, id_options) - else - logger.error "Subscription class not found (#{data.inspect})" - end - rescue Exception => e - logger.error "Could not subscribe to channel (#{data.inspect})" - log_exception(e) + disconnect if respond_to?(:disconnect) end - def process_message(message) - if @subscriptions[message['identifier']] - @subscriptions[message['identifier']].perform_action(ActiveSupport::JSON.decode message['data']) - else - raise "Unable to process message because no subscription was found (#{message.inspect})" - end - rescue Exception => e - logger.error "Could not process message (#{message.inspect})" - log_exception(e) - end - def unsubscribe_channel(data) - logger.info "Unsubscribing from channel: #{data['identifier']}" - @subscriptions[data['identifier']].perform_disconnection - @subscriptions.delete(data['identifier']) + def respond_to_successful_request + websocket.rack_response end - def invalid_request + def respond_to_invalid_request logger.info finished_request_message - [404, {'Content-Type' => 'text/plain'}, ['Page not found']] + [ 404, { 'Content-Type' => 'text/plain' }, [ 'Page not found' ] ] end - def websocket_alive? - @websocket && @websocket.ready_state == Faye::WebSocket::API::OPEN - end - def websocket? - @is_websocket ||= Faye::WebSocket.websocket?(@env) + # Tags are declared in the server but computed in the connection. This allows us per-connection tailored tags. + def new_tagged_logger + TaggedLoggerProxy.new server.logger, + tags: server.log_tags.map { |tag| tag.respond_to?(:call) ? tag.call(request) : tag.to_s.camelize } end def started_request_message 'Started %s "%s"%s for %s at %s' % [ request.request_method, request.filtered_path, - websocket? ? ' [Websocket]' : '', + websocket.possible? ? ' [Websocket]' : '', request.ip, Time.now.to_default_s ] end @@ -197,19 +132,10 @@ module ActionCable def finished_request_message 'Finished "%s"%s for %s at %s' % [ request.filtered_path, - websocket? ? ' [Websocket]' : '', + websocket.possible? ? ' [Websocket]' : '', request.ip, Time.now.to_default_s ] end - - def log_exception(e) - logger.error "There was an exception: #{e.class} - #{e.message}" - logger.error e.backtrace.join("\n") - end - - def log_tags - server.log_tags.map { |tag| tag.respond_to?(:call) ? tag.call(request) : tag.to_s.camelize } - end end end end diff --git a/lib/action_cable/connection/heartbeat.rb b/lib/action_cable/connection/heartbeat.rb new file mode 100644 index 0000000000..47cd937c25 --- /dev/null +++ b/lib/action_cable/connection/heartbeat.rb @@ -0,0 +1,27 @@ +module ActionCable + module Connection + class Heartbeat + BEAT_INTERVAL = 3 + + def initialize(connection) + @connection = connection + end + + def start + beat + @timer = EventMachine.add_periodic_timer(BEAT_INTERVAL) { beat } + end + + def stop + EventMachine.cancel_timer(@timer) if @timer + end + + private + attr_reader :connection + + def beat + connection.transmit({ identifier: '_ping', message: Time.now.to_i }.to_json) + end + end + end +end
\ No newline at end of file diff --git a/lib/action_cable/connection/identification.rb b/lib/action_cable/connection/identification.rb new file mode 100644 index 0000000000..246636198b --- /dev/null +++ b/lib/action_cable/connection/identification.rb @@ -0,0 +1,26 @@ +module ActionCable + module Connection + module Identification + extend ActiveSupport::Concern + + included do + class_attribute :identifiers + self.identifiers = Set.new + end + + class_methods do + def identified_by(*identifiers) + self.identifiers += identifiers + end + end + + def connection_identifier + @connection_identifier ||= connection_gid identifiers.map { |id| instance_variable_get("@#{id}") }.compact + end + + def connection_gid(ids) + ids.map { |o| o.to_global_id.to_s }.sort.join(":") + end + end + end +end diff --git a/lib/action_cable/connection/identifier.rb b/lib/action_cable/connection/identifier.rb deleted file mode 100644 index a608fc546a..0000000000 --- a/lib/action_cable/connection/identifier.rb +++ /dev/null @@ -1,17 +0,0 @@ -module ActionCable - module Connection - module Identifier - def internal_redis_channel - "action_cable/#{connection_identifier}" - end - - def connection_identifier - @connection_identifier ||= connection_gid identifiers.map { |id| instance_variable_get("@#{id}")}.compact - end - - def connection_gid(ids) - ids.map {|o| o.to_global_id.to_s }.sort.join(":") - end - end - end -end diff --git a/lib/action_cable/connection/internal_channel.rb b/lib/action_cable/connection/internal_channel.rb index 3a11bcaf7b..70e5e58373 100644 --- a/lib/action_cable/connection/internal_channel.rb +++ b/lib/action_cable/connection/internal_channel.rb @@ -3,6 +3,10 @@ module ActionCable module InternalChannel extend ActiveSupport::Concern + def internal_redis_channel + "action_cable/#{connection_identifier}" + end + def subscribe_to_internal_channel if connection_identifier.present? callback = -> (message) { process_internal_message(message) } @@ -27,13 +31,13 @@ module ActionCable case message['type'] when 'disconnect' logger.info "Removing connection (#{connection_identifier})" - @websocket.close + websocket.close end rescue Exception => e logger.error "There was an exception - #{e.class}(#{e.message})" logger.error e.backtrace.join("\n") - handle_exception + close end end end diff --git a/lib/action_cable/connection/message_buffer.rb b/lib/action_cable/connection/message_buffer.rb new file mode 100644 index 0000000000..615266e0cb --- /dev/null +++ b/lib/action_cable/connection/message_buffer.rb @@ -0,0 +1,51 @@ +module ActionCable + module Connection + class MessageBuffer + def initialize(connection) + @connection = connection + @buffered_messages = [] + end + + def append(message) + if valid? message + if processing? + receive message + else + buffer message + end + else + connection.logger.error "Couldn't handle non-string message: #{message.class}" + end + end + + def processing? + @processing + end + + def process! + @processing = true + receive_buffered_messages + end + + private + attr_reader :connection + attr_accessor :buffered_messages + + def valid?(message) + message.is_a?(String) + end + + def receive(message) + connection.send_async :receive, message + end + + def buffer(message) + buffered_messages << message + end + + def receive_buffered_messages + receive buffered_messages.shift until buffered_messages.empty? + end + end + end +end
\ No newline at end of file diff --git a/lib/action_cable/connection/subscriptions.rb b/lib/action_cable/connection/subscriptions.rb new file mode 100644 index 0000000000..24ab1bdfbf --- /dev/null +++ b/lib/action_cable/connection/subscriptions.rb @@ -0,0 +1,69 @@ +module ActionCable + module Connection + class Subscriptions + def initialize(connection) + @connection = connection + @subscriptions = {} + end + + def execute_command(data) + case data['command'] + when 'subscribe' then add data + when 'unsubscribe' then remove data + when 'message' then perform_action data + else + logger.error "Received unrecognized command in #{data.inspect}" + end + rescue Exception => e + logger.error "Could not execute command from #{data.inspect}) [#{e.class} - #{e.message}]: #{e.backtrace.first(5).join(" | ")}" + end + + def add(data) + id_key = data['identifier'] + id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access + + subscription_klass = connection.server.registered_channels.detect do |channel_klass| + channel_klass == id_options[:channel].safe_constantize + end + + if subscription_klass + subscriptions[id_key] ||= subscription_klass.new(connection, id_key, id_options) + else + logger.error "Subscription class not found (#{data.inspect})" + end + end + + def remove(data) + logger.info "Unsubscribing from channel: #{data['identifier']}" + subscriptions[data['identifier']].perform_disconnection + subscriptions.delete(data['identifier']) + end + + def perform_action(data) + find(data).perform_action ActiveSupport::JSON.decode(data['data']) + end + + + def identifiers + subscriptions.keys + end + + def cleanup + subscriptions.each { |id, channel| channel.perform_disconnection } + end + + + private + attr_reader :connection, :subscriptions + delegate :logger, to: :connection + + def find(data) + if subscription = subscriptions[data['identifier']] + subscription + else + raise "Unable to find subscription with identifier: #{data['identifier']}" + end + end + end + end +end diff --git a/lib/action_cable/connection/tagged_logger_proxy.rb b/lib/action_cable/connection/tagged_logger_proxy.rb index d99cc2e9a3..e0c0075adf 100644 --- a/lib/action_cable/connection/tagged_logger_proxy.rb +++ b/lib/action_cable/connection/tagged_logger_proxy.rb @@ -1,7 +1,9 @@ module ActionCable module Connection + # Allows the use of per-connection tags against the server logger. This wouldn't work using the tradional + # ActiveSupport::TaggedLogging-enhanced Rails.logger, as that logger will reset the tags between requests. + # The connection is long-lived, so it needs its own set of tags for its independent duration. class TaggedLoggerProxy - def initialize(logger, tags:) @logger = logger @tags = tags.flatten diff --git a/lib/action_cable/connection/web_socket.rb b/lib/action_cable/connection/web_socket.rb new file mode 100644 index 0000000000..135a28cfe4 --- /dev/null +++ b/lib/action_cable/connection/web_socket.rb @@ -0,0 +1,27 @@ +module ActionCable + module Connection + # Decorate the Faye::WebSocket with helpers we need. + class WebSocket + delegate :rack_response, :close, :on, to: :websocket + + def initialize(env) + @websocket = Faye::WebSocket.websocket?(env) ? Faye::WebSocket.new(env) : nil + end + + def possible? + websocket + end + + def alive? + websocket && websocket.ready_state == Faye::WebSocket::API::OPEN + end + + def transmit(data) + websocket.send data + end + + private + attr_reader :websocket + end + end +end |