aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable/connection/base.rb
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable/lib/action_cable/connection/base.rb')
-rw-r--r--actioncable/lib/action_cable/connection/base.rb88
1 files changed, 55 insertions, 33 deletions
diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb
index 1acef93025..dfee123ea2 100644
--- a/actioncable/lib/action_cable/connection/base.rb
+++ b/actioncable/lib/action_cable/connection/base.rb
@@ -1,10 +1,10 @@
-require 'action_dispatch'
+require "action_dispatch"
module ActionCable
module Connection
- # For every WebSocket the cable server is accepting, a Connection object will be instantiated. This instance becomes the parent
- # of all the channel subscriptions that are created from there on. Incoming messages are then routed to these channel subscriptions
- # based on an identifier sent by the cable consumer. The Connection itself does not deal with any specific application logic beyond
+ # For every WebSocket connection the Action Cable server accepts, a Connection object will be instantiated. This instance becomes the parent
+ # of all of the channel subscriptions that are created from there on. Incoming messages are then routed to these channel subscriptions
+ # based on an identifier sent by the Action Cable consumer. The Connection itself does not deal with any specific application logic beyond
# authentication and authorization.
#
# Here's a basic example:
@@ -33,14 +33,14 @@ module ActionCable
# end
# end
#
- # First, we declare that this connection can be identified by its current_user. This allows us later to be able to find all connections
- # established for that current_user (and potentially disconnect them if the user was removed from an account). You can declare as many
+ # First, we declare that this connection can be identified by its current_user. This allows us to later be able to find all connections
+ # established for that current_user (and potentially disconnect them). You can declare as many
# identification indexes as you like. Declaring an identification means that an attr_accessor is automatically set for that key.
#
# Second, we rely on the fact that the WebSocket connection is established with the cookies from the domain being sent along. This makes
# it easy to use signed cookies that were set when logging in via a web interface to authorize the WebSocket connection.
#
- # Finally, we add a tag to the connection-specific logger with name of the current user to easily distinguish their messages in the log.
+ # Finally, we add a tag to the connection-specific logger with the name of the current user to easily distinguish their messages in the log.
#
# Pretty simple, eh?
class Base
@@ -48,15 +48,16 @@ module ActionCable
include InternalChannel
include Authorization
- attr_reader :server, :env, :subscriptions, :logger
- delegate :stream_event_loop, :worker_pool, :pubsub, to: :server
+ attr_reader :server, :env, :subscriptions, :logger, :worker_pool, :protocol
+ delegate :event_loop, :pubsub, to: :server
- def initialize(server, env)
- @server, @env = server, env
+ def initialize(server, env, coder: ActiveSupport::JSON)
+ @server, @env, @coder = server, env, coder
+ @worker_pool = server.worker_pool
@logger = new_tagged_logger
- @websocket = ActionCable::Connection::WebSocket.new(env, self, stream_event_loop)
+ @websocket = ActionCable::Connection::WebSocket.new(env, self, event_loop)
@subscriptions = ActionCable::Connection::Subscriptions.new(self)
@message_buffer = ActionCable::Connection::MessageBuffer.new(self)
@@ -65,8 +66,8 @@ module ActionCable
end
# Called by the server when a new WebSocket connection is established. This configures the callbacks intended for overwriting by the user.
- # This method should not be called directly. Rely on the #connect (and #disconnect) callback instead.
- def process
+ # This method should not be called directly -- instead rely upon on the #connect (and #disconnect) callbacks.
+ def process #:nodoc:
logger.info started_request_message
if websocket.possible? && allow_request_origin?
@@ -76,20 +77,22 @@ module ActionCable
end
end
- # Data received over the cable is handled by this method. It's expected that everything inbound is JSON encoded.
- # The data is routed to the proper channel that the connection has subscribed to.
- def receive(data_in_json)
+ # Decodes WebSocket messages and dispatches them to subscribed channels.
+ # WebSocket message transfer encoding is always JSON.
+ def receive(websocket_message) #:nodoc:
+ send_async :dispatch_websocket_message, websocket_message
+ end
+
+ def dispatch_websocket_message(websocket_message) #:nodoc:
if websocket.alive?
- subscriptions.execute_command ActiveSupport::JSON.decode(data_in_json)
+ subscriptions.execute_command decode(websocket_message)
else
- logger.error "Received data without a live WebSocket (#{data_in_json.inspect})"
+ logger.error "Ignoring message processed after the WebSocket was closed: #{websocket_message.inspect})"
end
end
- # Send raw data straight back down the WebSocket. This is not intended to be called directly. Use the #transmit available on the
- # Channel instead, as that'll automatically address the correct subscriber and wrap the message in JSON.
- def transmit(data)
- websocket.transmit data
+ def transmit(cable_message) # :nodoc:
+ websocket.transmit encode(cable_message)
end
# Close the WebSocket connection.
@@ -102,19 +105,19 @@ module ActionCable
worker_pool.async_invoke(self, method, *arguments)
end
- # Return a basic hash of statistics for the connection keyed with `identifier`, `started_at`, and `subscriptions`.
+ # Return a basic hash of statistics for the connection keyed with <tt>identifier</tt>, <tt>started_at</tt>, <tt>subscriptions</tt>, and <tt>request_id</tt>.
# This can be returned by a health check against the connection.
def statistics
{
identifier: connection_identifier,
started_at: @started_at,
subscriptions: subscriptions.identifiers,
- request_id: @env['action_dispatch.request_id']
+ request_id: @env["action_dispatch.request_id"]
}
end
def beat
- transmit ActiveSupport::JSON.encode(identifier: ActionCable::INTERNAL[:identifiers][:ping], message: Time.now.to_i)
+ transmit type: ActionCable::INTERNAL[:message_types][:ping], message: Time.now.to_i
end
def on_open # :nodoc:
@@ -151,10 +154,19 @@ module ActionCable
attr_reader :message_buffer
private
+ def encode(cable_message)
+ @coder.encode cable_message
+ end
+
+ def decode(websocket_message)
+ @coder.decode websocket_message
+ end
+
def handle_open
+ @protocol = websocket.protocol
connect if respond_to?(:connect)
subscribe_to_internal_channel
- beat
+ send_welcome_message
message_buffer.process!
server.add_connection(self)
@@ -173,10 +185,20 @@ module ActionCable
disconnect if respond_to?(:disconnect)
end
+ def send_welcome_message
+ # Send welcome message to the internal connection monitor channel.
+ # This ensures the connection monitor state is reset after a successful
+ # websocket connection.
+ transmit type: ActionCable::INTERNAL[:message_types][:welcome]
+ end
+
def allow_request_origin?
return true if server.config.disable_request_forgery_protection
- if Array(server.config.allowed_request_origins).any? { |allowed_origin| allowed_origin === env['HTTP_ORIGIN'] }
+ proto = Rack::Request.new(env).ssl? ? "https" : "http"
+ if server.config.allow_same_origin_as_host && env["HTTP_ORIGIN"] == "#{proto}://#{env['HTTP_HOST']}"
+ true
+ elsif Array(server.config.allowed_request_origins).any? { |allowed_origin| allowed_origin === env["HTTP_ORIGIN"] }
true
else
logger.error("Request origin not allowed: #{env['HTTP_ORIGIN']}")
@@ -194,7 +216,7 @@ module ActionCable
logger.error invalid_request_message
logger.info finished_request_message
- [ 404, { 'Content-Type' => 'text/plain' }, [ 'Page not found' ] ]
+ [ 404, { "Content-Type" => "text/plain" }, [ "Page not found" ] ]
end
# Tags are declared in the server but computed in the connection. This allows us per-connection tailored tags.
@@ -207,7 +229,7 @@ module ActionCable
'Started %s "%s"%s for %s at %s' % [
request.request_method,
request.filtered_path,
- websocket.possible? ? ' [WebSocket]' : '[non-WebSocket]',
+ websocket.possible? ? " [WebSocket]" : "[non-WebSocket]",
request.ip,
Time.now.to_s ]
end
@@ -215,19 +237,19 @@ module ActionCable
def finished_request_message
'Finished "%s"%s for %s at %s' % [
request.filtered_path,
- websocket.possible? ? ' [WebSocket]' : '[non-WebSocket]',
+ websocket.possible? ? " [WebSocket]" : "[non-WebSocket]",
request.ip,
Time.now.to_s ]
end
def invalid_request_message
- 'Failed to upgrade to WebSocket (REQUEST_METHOD: %s, HTTP_CONNECTION: %s, HTTP_UPGRADE: %s)' % [
+ "Failed to upgrade to WebSocket (REQUEST_METHOD: %s, HTTP_CONNECTION: %s, HTTP_UPGRADE: %s)" % [
env["REQUEST_METHOD"], env["HTTP_CONNECTION"], env["HTTP_UPGRADE"]
]
end
def successful_request_message
- 'Successfully upgraded to WebSocket (REQUEST_METHOD: %s, HTTP_CONNECTION: %s, HTTP_UPGRADE: %s)' % [
+ "Successfully upgraded to WebSocket (REQUEST_METHOD: %s, HTTP_CONNECTION: %s, HTTP_UPGRADE: %s)" % [
env["REQUEST_METHOD"], env["HTTP_CONNECTION"], env["HTTP_UPGRADE"]
]
end