aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable/connection
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable/lib/action_cable/connection')
-rw-r--r--actioncable/lib/action_cable/connection/base.rb52
-rw-r--r--actioncable/lib/action_cable/connection/client_socket.rb15
-rw-r--r--actioncable/lib/action_cable/connection/identification.rb2
-rw-r--r--actioncable/lib/action_cable/connection/message_buffer.rb5
-rw-r--r--actioncable/lib/action_cable/connection/stream.rb18
-rw-r--r--actioncable/lib/action_cable/connection/subscriptions.rb6
6 files changed, 58 insertions, 40 deletions
diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb
index b5f898436a..afe0d958d7 100644
--- a/actioncable/lib/action_cable/connection/base.rb
+++ b/actioncable/lib/action_cable/connection/base.rb
@@ -2,9 +2,9 @@ require 'action_dispatch'
module ActionCable
module Connection
- # For every WebSocket the cable server is accepting, a Connection object will be instantiated. This instance becomes the parent
- # of all the channel subscriptions that are created from there on. Incoming messages are then routed to these channel subscriptions
- # based on an identifier sent by the cable consumer. The Connection itself does not deal with any specific application logic beyond
+ # For every WebSocket the Action Cable server accepts, a Connection object will be instantiated. This instance becomes the parent
+ # of all of the channel subscriptions that are created from there on. Incoming messages are then routed to these channel subscriptions
+ # based on an identifier sent by the Action Cable consumer. The Connection itself does not deal with any specific application logic beyond
# authentication and authorization.
#
# Here's a basic example:
@@ -33,9 +33,9 @@ module ActionCable
# end
# end
#
- # First, we declare that this connection can be identified by its current_user. This allows us later to be able to find all connections
- # established for that current_user (and potentially disconnect them if the user was removed from an account). You can declare as many
- # identification indexes as you like. Declaring an identification means that a attr_accessor is automatically set for that key.
+ # First, we declare that this connection can be identified by its current_user. This allows us to later be able to find all connections
+ # established for that current_user (and potentially disconnect them). You can declare as many
+ # identification indexes as you like. Declaring an identification means that an attr_accessor is automatically set for that key.
#
# Second, we rely on the fact that the WebSocket connection is established with the cookies from the domain being sent along. This makes
# it easy to use signed cookies that were set when logging in via a web interface to authorize the WebSocket connection.
@@ -48,12 +48,13 @@ module ActionCable
include InternalChannel
include Authorization
- attr_reader :server, :env, :subscriptions, :logger
- delegate :stream_event_loop, :worker_pool, :pubsub, to: :server
+ attr_reader :server, :env, :subscriptions, :logger, :worker_pool
+ delegate :stream_event_loop, :pubsub, to: :server
def initialize(server, env)
@server, @env = server, env
+ @worker_pool = server.worker_pool
@logger = new_tagged_logger
@websocket = ActionCable::Connection::WebSocket.new(env, self, stream_event_loop)
@@ -65,8 +66,8 @@ module ActionCable
end
# Called by the server when a new WebSocket connection is established. This configures the callbacks intended for overwriting by the user.
- # This method should not be called directly. Rely on the #connect (and #disconnect) callback instead.
- def process
+ # This method should not be called directly -- instead rely upon on the #connect (and #disconnect) callbacks.
+ def process # :nodoc:
logger.info started_request_message
if websocket.possible? && allow_request_origin?
@@ -76,7 +77,7 @@ module ActionCable
end
end
- # Data received over the cable is handled by this method. It's expected that everything inbound is JSON encoded.
+ # Data received over the WebSocket connection is handled by this method. It's expected that everything inbound is JSON encoded.
# The data is routed to the proper channel that the connection has subscribed to.
def receive(data_in_json)
if websocket.alive?
@@ -88,7 +89,7 @@ module ActionCable
# Send raw data straight back down the WebSocket. This is not intended to be called directly. Use the #transmit available on the
# Channel instead, as that'll automatically address the correct subscriber and wrap the message in JSON.
- def transmit(data)
+ def transmit(data) # :nodoc:
websocket.transmit data
end
@@ -154,7 +155,7 @@ module ActionCable
def handle_open
connect if respond_to?(:connect)
subscribe_to_internal_channel
- beat
+ confirm_connection_monitor_subscription
message_buffer.process!
server.add_connection(self)
@@ -173,6 +174,13 @@ module ActionCable
disconnect if respond_to?(:disconnect)
end
+ def confirm_connection_monitor_subscription
+ # Send confirmation message to the internal connection monitor channel.
+ # This ensures the connection monitor state is reset after a successful
+ # websocket connection.
+ transmit ActiveSupport::JSON.encode(identifier: ActionCable::INTERNAL[:identifiers][:ping], type: ActionCable::INTERNAL[:message_types][:confirmation])
+ end
+
def allow_request_origin?
return true if server.config.disable_request_forgery_protection
@@ -185,12 +193,14 @@ module ActionCable
end
def respond_to_successful_request
+ logger.info successful_request_message
websocket.rack_response
end
def respond_to_invalid_request
close if websocket.alive?
+ logger.error invalid_request_message
logger.info finished_request_message
[ 404, { 'Content-Type' => 'text/plain' }, [ 'Page not found' ] ]
end
@@ -205,7 +215,7 @@ module ActionCable
'Started %s "%s"%s for %s at %s' % [
request.request_method,
request.filtered_path,
- websocket.possible? ? ' [WebSocket]' : '',
+ websocket.possible? ? ' [WebSocket]' : '[non-WebSocket]',
request.ip,
Time.now.to_s ]
end
@@ -213,10 +223,22 @@ module ActionCable
def finished_request_message
'Finished "%s"%s for %s at %s' % [
request.filtered_path,
- websocket.possible? ? ' [WebSocket]' : '',
+ websocket.possible? ? ' [WebSocket]' : '[non-WebSocket]',
request.ip,
Time.now.to_s ]
end
+
+ def invalid_request_message
+ 'Failed to upgrade to WebSocket (REQUEST_METHOD: %s, HTTP_CONNECTION: %s, HTTP_UPGRADE: %s)' % [
+ env["REQUEST_METHOD"], env["HTTP_CONNECTION"], env["HTTP_UPGRADE"]
+ ]
+ end
+
+ def successful_request_message
+ 'Successfully upgraded to WebSocket (REQUEST_METHOD: %s, HTTP_CONNECTION: %s, HTTP_UPGRADE: %s)' % [
+ env["REQUEST_METHOD"], env["HTTP_CONNECTION"], env["HTTP_UPGRADE"]
+ ]
+ end
end
end
end
diff --git a/actioncable/lib/action_cable/connection/client_socket.rb b/actioncable/lib/action_cable/connection/client_socket.rb
index ef937d7c16..f6b11e93f0 100644
--- a/actioncable/lib/action_cable/connection/client_socket.rb
+++ b/actioncable/lib/action_cable/connection/client_socket.rb
@@ -50,14 +50,16 @@ module ActionCable
@driver.on(:error) { |e| emit_error(e.message) }
@stream = ActionCable::Connection::Stream.new(@stream_event_loop, self)
+ end
+
+ def start_driver
+ return if @driver.nil? || @driver_started
+ @stream.hijack_rack_socket
if callback = @env['async.callback']
callback.call([101, {}, @stream])
end
- end
- def start_driver
- return if @driver.nil? || @driver_started
@driver_started = true
@driver.start
end
@@ -132,11 +134,8 @@ module ActionCable
@ready_state = CLOSING
@close_params = [reason, code]
- if @stream
- @stream.shutdown
- else
- finalize_close
- end
+ @stream.shutdown if @stream
+ finalize_close
end
def finalize_close
diff --git a/actioncable/lib/action_cable/connection/identification.rb b/actioncable/lib/action_cable/connection/identification.rb
index 885ff3f102..4a54044aff 100644
--- a/actioncable/lib/action_cable/connection/identification.rb
+++ b/actioncable/lib/action_cable/connection/identification.rb
@@ -12,7 +12,7 @@ module ActionCable
class_methods do
# Mark a key as being a connection identifier index that can then be used to find the specific connection again later.
- # Common identifiers are current_user and current_account, but could be anything really.
+ # Common identifiers are current_user and current_account, but could be anything, really.
#
# Note that anything marked as an identifier will automatically create a delegate by the same name on any
# channel instances created off the connection.
diff --git a/actioncable/lib/action_cable/connection/message_buffer.rb b/actioncable/lib/action_cable/connection/message_buffer.rb
index 2f65a1e84a..19f2e6e918 100644
--- a/actioncable/lib/action_cable/connection/message_buffer.rb
+++ b/actioncable/lib/action_cable/connection/message_buffer.rb
@@ -1,8 +1,7 @@
module ActionCable
module Connection
- # Allows us to buffer messages received from the WebSocket before the Connection has been fully initialized and is ready to receive them.
- # Entirely internal operation and should not be used directly by the user.
- class MessageBuffer
+ # Allows us to buffer messages received from the WebSocket before the Connection has been fully initialized, and is ready to receive them.
+ class MessageBuffer # :nodoc:
def initialize(connection)
@connection = connection
@buffered_messages = []
diff --git a/actioncable/lib/action_cable/connection/stream.rb b/actioncable/lib/action_cable/connection/stream.rb
index ace250cd16..2d97b28c09 100644
--- a/actioncable/lib/action_cable/connection/stream.rb
+++ b/actioncable/lib/action_cable/connection/stream.rb
@@ -4,15 +4,13 @@ module ActionCable
# This class is heavily based on faye-websocket-ruby
#
# Copyright (c) 2010-2015 James Coglan
- class Stream
+ class Stream # :nodoc:
def initialize(event_loop, socket)
@event_loop = event_loop
@socket_object = socket
@stream_send = socket.env['stream.send']
@rack_hijack_io = nil
-
- hijack_rack_socket
end
def each(&callback)
@@ -39,16 +37,16 @@ module ActionCable
@socket_object.parse(data)
end
- private
- def hijack_rack_socket
- return unless @socket_object.env['rack.hijack']
+ def hijack_rack_socket
+ return unless @socket_object.env['rack.hijack']
- @socket_object.env['rack.hijack'].call
- @rack_hijack_io = @socket_object.env['rack.hijack_io']
+ @socket_object.env['rack.hijack'].call
+ @rack_hijack_io = @socket_object.env['rack.hijack_io']
- @event_loop.attach(@rack_hijack_io, self)
- end
+ @event_loop.attach(@rack_hijack_io, self)
+ end
+ private
def clean_rack_hijack
return unless @rack_hijack_io
@event_loop.detach(@rack_hijack_io, self)
diff --git a/actioncable/lib/action_cable/connection/subscriptions.rb b/actioncable/lib/action_cable/connection/subscriptions.rb
index d7f95e6a62..3742f248d1 100644
--- a/actioncable/lib/action_cable/connection/subscriptions.rb
+++ b/actioncable/lib/action_cable/connection/subscriptions.rb
@@ -3,8 +3,8 @@ require 'active_support/core_ext/hash/indifferent_access'
module ActionCable
module Connection
# Collection class for all the channel subscriptions established on a given connection. Responsible for routing incoming commands that arrive on
- # the connection to the proper channel. Should not be used directly by the user.
- class Subscriptions
+ # the connection to the proper channel.
+ class Subscriptions # :nodoc:
def initialize(connection)
@connection = connection
@subscriptions = {}
@@ -54,7 +54,7 @@ module ActionCable
end
def unsubscribe_from_all
- subscriptions.each { |id, channel| channel.unsubscribe_from_channel }
+ subscriptions.each { |id, channel| remove_subscription(channel) }
end
protected