diff options
Diffstat (limited to 'actioncable/lib/action_cable/connection')
8 files changed, 38 insertions, 40 deletions
diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb index 43c83b4b78..9a7dfbe761 100644 --- a/actioncable/lib/action_cable/connection/base.rb +++ b/actioncable/lib/action_cable/connection/base.rb @@ -51,8 +51,8 @@ module ActionCable attr_reader :server, :env, :subscriptions, :logger, :worker_pool, :protocol delegate :event_loop, :pubsub, to: :server - def initialize(server, env) - @server, @env = server, env + def initialize(server, env, coder: ActiveSupport::JSON) + @server, @env, @coder = server, env, coder @worker_pool = server.worker_pool @logger = new_tagged_logger @@ -67,7 +67,7 @@ module ActionCable # Called by the server when a new WebSocket connection is established. This configures the callbacks intended for overwriting by the user. # This method should not be called directly -- instead rely upon on the #connect (and #disconnect) callbacks. - def process # :nodoc: + def process #:nodoc: logger.info started_request_message if websocket.possible? && allow_request_origin? @@ -77,20 +77,22 @@ module ActionCable end end - # Data received over the WebSocket connection is handled by this method. It's expected that everything inbound is JSON encoded. - # The data is routed to the proper channel that the connection has subscribed to. - def receive(data_in_json) + # Decodes WebSocket messages and dispatches them to subscribed channels. + # WebSocket message transfer encoding is always JSON. + def receive(websocket_message) #:nodoc: + send_async :dispatch_websocket_message, websocket_message + end + + def dispatch_websocket_message(websocket_message) #:nodoc: if websocket.alive? - subscriptions.execute_command ActiveSupport::JSON.decode(data_in_json) + subscriptions.execute_command decode(websocket_message) else - logger.error "Received data without a live WebSocket (#{data_in_json.inspect})" + logger.error "Ignoring message processed after the WebSocket was closed: #{websocket_message.inspect})" end end - # Send raw data straight back down the WebSocket. This is not intended to be called directly. Use the #transmit available on the - # Channel instead, as that'll automatically address the correct subscriber and wrap the message in JSON. - def transmit(data) # :nodoc: - websocket.transmit data + def transmit(cable_message) # :nodoc: + websocket.transmit encode(cable_message) end # Close the WebSocket connection. @@ -115,7 +117,7 @@ module ActionCable end def beat - transmit ActiveSupport::JSON.encode(type: ActionCable::INTERNAL[:message_types][:ping], message: Time.now.to_i) + transmit type: ActionCable::INTERNAL[:message_types][:ping], message: Time.now.to_i end def on_open # :nodoc: @@ -152,6 +154,14 @@ module ActionCable attr_reader :message_buffer private + def encode(cable_message) + @coder.encode cable_message + end + + def decode(websocket_message) + @coder.decode websocket_message + end + def handle_open @protocol = websocket.protocol connect if respond_to?(:connect) @@ -179,7 +189,7 @@ module ActionCable # Send welcome message to the internal connection monitor channel. # This ensures the connection monitor state is reset after a successful # websocket connection. - transmit ActiveSupport::JSON.encode(type: ActionCable::INTERNAL[:message_types][:welcome]) + transmit type: ActionCable::INTERNAL[:message_types][:welcome] end def allow_request_origin? diff --git a/actioncable/lib/action_cable/connection/client_socket.rb b/actioncable/lib/action_cable/connection/client_socket.rb index cf529ebd81..6f29f32ea9 100644 --- a/actioncable/lib/action_cable/connection/client_socket.rb +++ b/actioncable/lib/action_cable/connection/client_socket.rb @@ -71,6 +71,8 @@ module ActionCable def write(data) @stream.write(data) + rescue => e + emit_error e.message end def transmit(message) diff --git a/actioncable/lib/action_cable/connection/faye_client_socket.rb b/actioncable/lib/action_cable/connection/faye_client_socket.rb index 09834ca39d..a4bfe7db17 100644 --- a/actioncable/lib/action_cable/connection/faye_client_socket.rb +++ b/actioncable/lib/action_cable/connection/faye_client_socket.rb @@ -41,6 +41,7 @@ module ActionCable @faye.on(:open) { |event| @event_target.on_open } @faye.on(:message) { |event| @event_target.on_message(event.data) } @faye.on(:close) { |event| @event_target.on_close(event.reason, event.code) } + @faye.on(:error) { |event| @event_target.on_error(event.message) } end end end diff --git a/actioncable/lib/action_cable/connection/faye_event_loop.rb b/actioncable/lib/action_cable/connection/faye_event_loop.rb index 8b70f3d84e..9c44b38bc3 100644 --- a/actioncable/lib/action_cable/connection/faye_event_loop.rb +++ b/actioncable/lib/action_cable/connection/faye_event_loop.rb @@ -36,7 +36,7 @@ module ActionCable end def shutdown - inner.cancel + @inner.cancel end end end diff --git a/actioncable/lib/action_cable/connection/internal_channel.rb b/actioncable/lib/action_cable/connection/internal_channel.rb index 3c5d39f59a..f70d52f99b 100644 --- a/actioncable/lib/action_cable/connection/internal_channel.rb +++ b/actioncable/lib/action_cable/connection/internal_channel.rb @@ -11,7 +11,7 @@ module ActionCable def subscribe_to_internal_channel if connection_identifier.present? - callback = -> (message) { process_internal_message(message) } + callback = -> (message) { process_internal_message decode(message) } @_internal_subscriptions ||= [] @_internal_subscriptions << [ internal_channel, callback ] @@ -27,8 +27,6 @@ module ActionCable end def process_internal_message(message) - message = ActiveSupport::JSON.decode(message) - case message['type'] when 'disconnect' logger.info "Removing connection (#{connection_identifier})" diff --git a/actioncable/lib/action_cable/connection/message_buffer.rb b/actioncable/lib/action_cable/connection/message_buffer.rb index 19f2e6e918..6a80770cae 100644 --- a/actioncable/lib/action_cable/connection/message_buffer.rb +++ b/actioncable/lib/action_cable/connection/message_buffer.rb @@ -30,7 +30,7 @@ module ActionCable protected attr_reader :connection - attr_accessor :buffered_messages + attr_reader :buffered_messages private def valid?(message) @@ -38,7 +38,7 @@ module ActionCable end def receive(message) - connection.send_async :receive, message + connection.receive message end def buffer(message) diff --git a/actioncable/lib/action_cable/connection/stream.rb b/actioncable/lib/action_cable/connection/stream.rb index 2d97b28c09..0cf59091bc 100644 --- a/actioncable/lib/action_cable/connection/stream.rb +++ b/actioncable/lib/action_cable/connection/stream.rb @@ -29,7 +29,7 @@ module ActionCable def write(data) return @rack_hijack_io.write(data) if @rack_hijack_io return @stream_send.call(data) if @stream_send - rescue EOFError + rescue EOFError, Errno::ECONNRESET @socket_object.client_gone end diff --git a/actioncable/lib/action_cable/connection/subscriptions.rb b/actioncable/lib/action_cable/connection/subscriptions.rb index 5aa907c2d3..3742f248d1 100644 --- a/actioncable/lib/action_cable/connection/subscriptions.rb +++ b/actioncable/lib/action_cable/connection/subscriptions.rb @@ -23,13 +23,13 @@ module ActionCable end def add(data) - id_options = decode_hash(data['identifier']) - identifier = normalize_identifier(id_options) + id_key = data['identifier'] + id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access subscription_klass = connection.server.channel_classes[id_options[:channel]] if subscription_klass - subscriptions[identifier] ||= subscription_klass.new(connection, identifier, id_options) + subscriptions[id_key] ||= subscription_klass.new(connection, id_key, id_options) else logger.error "Subscription class not found (#{data.inspect})" end @@ -37,7 +37,7 @@ module ActionCable def remove(data) logger.info "Unsubscribing from channel: #{data['identifier']}" - remove_subscription subscriptions[normalize_identifier(data['identifier'])] + remove_subscription subscriptions[data['identifier']] end def remove_subscription(subscription) @@ -46,7 +46,7 @@ module ActionCable end def perform_action(data) - find(data).perform_action(decode_hash(data['data'])) + find(data).perform_action ActiveSupport::JSON.decode(data['data']) end def identifiers @@ -63,21 +63,8 @@ module ActionCable private delegate :logger, to: :connection - def normalize_identifier(identifier) - identifier = ActiveSupport::JSON.encode(identifier) if identifier.is_a?(Hash) - identifier - end - - # If `data` is a Hash, this means that the original JSON - # sent by the client had no backslashes in it, and does - # not need to be decoded again. - def decode_hash(data) - data = ActiveSupport::JSON.decode(data) unless data.is_a?(Hash) - data.with_indifferent_access - end - def find(data) - if subscription = subscriptions[normalize_identifier(data['identifier'])] + if subscription = subscriptions[data['identifier']] subscription else raise "Unable to find subscription with identifier: #{data['identifier']}" |