aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable/connection
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable/lib/action_cable/connection')
-rw-r--r--actioncable/lib/action_cable/connection/base.rb43
-rw-r--r--actioncable/lib/action_cable/connection/client_socket.rb10
-rw-r--r--actioncable/lib/action_cable/connection/faye_client_socket.rb10
-rw-r--r--actioncable/lib/action_cable/connection/faye_event_loop.rb2
-rw-r--r--actioncable/lib/action_cable/connection/internal_channel.rb4
-rw-r--r--actioncable/lib/action_cable/connection/message_buffer.rb4
-rw-r--r--actioncable/lib/action_cable/connection/stream.rb2
-rw-r--r--actioncable/lib/action_cable/connection/subscriptions.rb25
-rw-r--r--actioncable/lib/action_cable/connection/web_socket.rb8
9 files changed, 60 insertions, 48 deletions
diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb
index b4488265cb..cc4e0f8c8b 100644
--- a/actioncable/lib/action_cable/connection/base.rb
+++ b/actioncable/lib/action_cable/connection/base.rb
@@ -40,7 +40,7 @@ module ActionCable
# Second, we rely on the fact that the WebSocket connection is established with the cookies from the domain being sent along. This makes
# it easy to use signed cookies that were set when logging in via a web interface to authorize the WebSocket connection.
#
- # Finally, we add a tag to the connection-specific logger with name of the current user to easily distinguish their messages in the log.
+ # Finally, we add a tag to the connection-specific logger with the name of the current user to easily distinguish their messages in the log.
#
# Pretty simple, eh?
class Base
@@ -48,11 +48,11 @@ module ActionCable
include InternalChannel
include Authorization
- attr_reader :server, :env, :subscriptions, :logger, :worker_pool
+ attr_reader :server, :env, :subscriptions, :logger, :worker_pool, :protocol
delegate :event_loop, :pubsub, to: :server
- def initialize(server, env)
- @server, @env = server, env
+ def initialize(server, env, coder: ActiveSupport::JSON)
+ @server, @env, @coder = server, env, coder
@worker_pool = server.worker_pool
@logger = new_tagged_logger
@@ -67,7 +67,7 @@ module ActionCable
# Called by the server when a new WebSocket connection is established. This configures the callbacks intended for overwriting by the user.
# This method should not be called directly -- instead rely upon on the #connect (and #disconnect) callbacks.
- def process # :nodoc:
+ def process #:nodoc:
logger.info started_request_message
if websocket.possible? && allow_request_origin?
@@ -77,20 +77,22 @@ module ActionCable
end
end
- # Data received over the WebSocket connection is handled by this method. It's expected that everything inbound is JSON encoded.
- # The data is routed to the proper channel that the connection has subscribed to.
- def receive(data_in_json)
+ # Decodes WebSocket messages and dispatches them to subscribed channels.
+ # WebSocket message transfer encoding is always JSON.
+ def receive(websocket_message) #:nodoc:
+ send_async :dispatch_websocket_message, websocket_message
+ end
+
+ def dispatch_websocket_message(websocket_message) #:nodoc:
if websocket.alive?
- subscriptions.execute_command ActiveSupport::JSON.decode(data_in_json)
+ subscriptions.execute_command decode(websocket_message)
else
- logger.error "Received data without a live WebSocket (#{data_in_json.inspect})"
+ logger.error "Ignoring message processed after the WebSocket was closed: #{websocket_message.inspect})"
end
end
- # Send raw data straight back down the WebSocket. This is not intended to be called directly. Use the #transmit available on the
- # Channel instead, as that'll automatically address the correct subscriber and wrap the message in JSON.
- def transmit(data) # :nodoc:
- websocket.transmit data
+ def transmit(cable_message) # :nodoc:
+ websocket.transmit encode(cable_message)
end
# Close the WebSocket connection.
@@ -115,7 +117,7 @@ module ActionCable
end
def beat
- transmit ActiveSupport::JSON.encode(type: ActionCable::INTERNAL[:message_types][:ping], message: Time.now.to_i)
+ transmit type: ActionCable::INTERNAL[:message_types][:ping], message: Time.now.to_i
end
def on_open # :nodoc:
@@ -152,7 +154,16 @@ module ActionCable
attr_reader :message_buffer
private
+ def encode(cable_message)
+ @coder.encode cable_message
+ end
+
+ def decode(websocket_message)
+ @coder.decode websocket_message
+ end
+
def handle_open
+ @protocol = websocket.protocol
connect if respond_to?(:connect)
subscribe_to_internal_channel
send_welcome_message
@@ -178,7 +189,7 @@ module ActionCable
# Send welcome message to the internal connection monitor channel.
# This ensures the connection monitor state is reset after a successful
# websocket connection.
- transmit ActiveSupport::JSON.encode(type: ActionCable::INTERNAL[:message_types][:welcome])
+ transmit type: ActionCable::INTERNAL[:message_types][:welcome]
end
def allow_request_origin?
diff --git a/actioncable/lib/action_cable/connection/client_socket.rb b/actioncable/lib/action_cable/connection/client_socket.rb
index 9e4dbcd6e6..6f29f32ea9 100644
--- a/actioncable/lib/action_cable/connection/client_socket.rb
+++ b/actioncable/lib/action_cable/connection/client_socket.rb
@@ -29,7 +29,7 @@ module ActionCable
attr_reader :env, :url
- def initialize(env, event_target, event_loop)
+ def initialize(env, event_target, event_loop, protocols)
@env = env
@event_target = event_target
@event_loop = event_loop
@@ -42,7 +42,7 @@ module ActionCable
@ready_state = CONNECTING
# The driver calls +env+, +url+, and +write+
- @driver = ::WebSocket::Driver.rack(self)
+ @driver = ::WebSocket::Driver.rack(self, protocols: protocols)
@driver.on(:open) { |e| open }
@driver.on(:message) { |e| receive_message(e.data) }
@@ -71,6 +71,8 @@ module ActionCable
def write(data)
@stream.write(data)
+ rescue => e
+ emit_error e.message
end
def transmit(message)
@@ -109,6 +111,10 @@ module ActionCable
@ready_state == OPEN
end
+ def protocol
+ @driver.protocol
+ end
+
private
def open
return unless @ready_state == CONNECTING
diff --git a/actioncable/lib/action_cable/connection/faye_client_socket.rb b/actioncable/lib/action_cable/connection/faye_client_socket.rb
index c9139b6858..a4bfe7db17 100644
--- a/actioncable/lib/action_cable/connection/faye_client_socket.rb
+++ b/actioncable/lib/action_cable/connection/faye_client_socket.rb
@@ -3,9 +3,10 @@ require 'faye/websocket'
module ActionCable
module Connection
class FayeClientSocket
- def initialize(env, event_target, stream_event_loop)
+ def initialize(env, event_target, stream_event_loop, protocols)
@env = env
@event_target = event_target
+ @protocols = protocols
@faye = nil
end
@@ -23,6 +24,10 @@ module ActionCable
@faye && @faye.close
end
+ def protocol
+ @faye && @faye.protocol
+ end
+
def rack_response
connect
@faye.rack_response
@@ -31,11 +36,12 @@ module ActionCable
private
def connect
return if @faye
- @faye = Faye::WebSocket.new(@env)
+ @faye = Faye::WebSocket.new(@env, @protocols)
@faye.on(:open) { |event| @event_target.on_open }
@faye.on(:message) { |event| @event_target.on_message(event.data) }
@faye.on(:close) { |event| @event_target.on_close(event.reason, event.code) }
+ @faye.on(:error) { |event| @event_target.on_error(event.message) }
end
end
end
diff --git a/actioncable/lib/action_cable/connection/faye_event_loop.rb b/actioncable/lib/action_cable/connection/faye_event_loop.rb
index 8b70f3d84e..9c44b38bc3 100644
--- a/actioncable/lib/action_cable/connection/faye_event_loop.rb
+++ b/actioncable/lib/action_cable/connection/faye_event_loop.rb
@@ -36,7 +36,7 @@ module ActionCable
end
def shutdown
- inner.cancel
+ @inner.cancel
end
end
end
diff --git a/actioncable/lib/action_cable/connection/internal_channel.rb b/actioncable/lib/action_cable/connection/internal_channel.rb
index 3c5d39f59a..f70d52f99b 100644
--- a/actioncable/lib/action_cable/connection/internal_channel.rb
+++ b/actioncable/lib/action_cable/connection/internal_channel.rb
@@ -11,7 +11,7 @@ module ActionCable
def subscribe_to_internal_channel
if connection_identifier.present?
- callback = -> (message) { process_internal_message(message) }
+ callback = -> (message) { process_internal_message decode(message) }
@_internal_subscriptions ||= []
@_internal_subscriptions << [ internal_channel, callback ]
@@ -27,8 +27,6 @@ module ActionCable
end
def process_internal_message(message)
- message = ActiveSupport::JSON.decode(message)
-
case message['type']
when 'disconnect'
logger.info "Removing connection (#{connection_identifier})"
diff --git a/actioncable/lib/action_cable/connection/message_buffer.rb b/actioncable/lib/action_cable/connection/message_buffer.rb
index 19f2e6e918..6a80770cae 100644
--- a/actioncable/lib/action_cable/connection/message_buffer.rb
+++ b/actioncable/lib/action_cable/connection/message_buffer.rb
@@ -30,7 +30,7 @@ module ActionCable
protected
attr_reader :connection
- attr_accessor :buffered_messages
+ attr_reader :buffered_messages
private
def valid?(message)
@@ -38,7 +38,7 @@ module ActionCable
end
def receive(message)
- connection.send_async :receive, message
+ connection.receive message
end
def buffer(message)
diff --git a/actioncable/lib/action_cable/connection/stream.rb b/actioncable/lib/action_cable/connection/stream.rb
index 2d97b28c09..0cf59091bc 100644
--- a/actioncable/lib/action_cable/connection/stream.rb
+++ b/actioncable/lib/action_cable/connection/stream.rb
@@ -29,7 +29,7 @@ module ActionCable
def write(data)
return @rack_hijack_io.write(data) if @rack_hijack_io
return @stream_send.call(data) if @stream_send
- rescue EOFError
+ rescue EOFError, Errno::ECONNRESET
@socket_object.client_gone
end
diff --git a/actioncable/lib/action_cable/connection/subscriptions.rb b/actioncable/lib/action_cable/connection/subscriptions.rb
index 5aa907c2d3..3742f248d1 100644
--- a/actioncable/lib/action_cable/connection/subscriptions.rb
+++ b/actioncable/lib/action_cable/connection/subscriptions.rb
@@ -23,13 +23,13 @@ module ActionCable
end
def add(data)
- id_options = decode_hash(data['identifier'])
- identifier = normalize_identifier(id_options)
+ id_key = data['identifier']
+ id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access
subscription_klass = connection.server.channel_classes[id_options[:channel]]
if subscription_klass
- subscriptions[identifier] ||= subscription_klass.new(connection, identifier, id_options)
+ subscriptions[id_key] ||= subscription_klass.new(connection, id_key, id_options)
else
logger.error "Subscription class not found (#{data.inspect})"
end
@@ -37,7 +37,7 @@ module ActionCable
def remove(data)
logger.info "Unsubscribing from channel: #{data['identifier']}"
- remove_subscription subscriptions[normalize_identifier(data['identifier'])]
+ remove_subscription subscriptions[data['identifier']]
end
def remove_subscription(subscription)
@@ -46,7 +46,7 @@ module ActionCable
end
def perform_action(data)
- find(data).perform_action(decode_hash(data['data']))
+ find(data).perform_action ActiveSupport::JSON.decode(data['data'])
end
def identifiers
@@ -63,21 +63,8 @@ module ActionCable
private
delegate :logger, to: :connection
- def normalize_identifier(identifier)
- identifier = ActiveSupport::JSON.encode(identifier) if identifier.is_a?(Hash)
- identifier
- end
-
- # If `data` is a Hash, this means that the original JSON
- # sent by the client had no backslashes in it, and does
- # not need to be decoded again.
- def decode_hash(data)
- data = ActiveSupport::JSON.decode(data) unless data.is_a?(Hash)
- data.with_indifferent_access
- end
-
def find(data)
- if subscription = subscriptions[normalize_identifier(data['identifier'])]
+ if subscription = subscriptions[data['identifier']]
subscription
else
raise "Unable to find subscription with identifier: #{data['identifier']}"
diff --git a/actioncable/lib/action_cable/connection/web_socket.rb b/actioncable/lib/action_cable/connection/web_socket.rb
index 0bec9b6a96..11f28c37e8 100644
--- a/actioncable/lib/action_cable/connection/web_socket.rb
+++ b/actioncable/lib/action_cable/connection/web_socket.rb
@@ -4,8 +4,8 @@ module ActionCable
module Connection
# Wrap the real socket to minimize the externally-presented API
class WebSocket
- def initialize(env, event_target, event_loop, client_socket_class)
- @websocket = ::WebSocket::Driver.websocket?(env) ? client_socket_class.new(env, event_target, event_loop) : nil
+ def initialize(env, event_target, event_loop, client_socket_class, protocols: ActionCable::INTERNAL[:protocols])
+ @websocket = ::WebSocket::Driver.websocket?(env) ? client_socket_class.new(env, event_target, event_loop, protocols) : nil
end
def possible?
@@ -24,6 +24,10 @@ module ActionCable
websocket.close
end
+ def protocol
+ websocket.protocol
+ end
+
def rack_response
websocket.rack_response
end