diff options
Diffstat (limited to 'actioncable/lib/action_cable/connection')
6 files changed, 58 insertions, 40 deletions
diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb index b5f898436a..afe0d958d7 100644 --- a/actioncable/lib/action_cable/connection/base.rb +++ b/actioncable/lib/action_cable/connection/base.rb @@ -2,9 +2,9 @@ require 'action_dispatch' module ActionCable module Connection - # For every WebSocket the cable server is accepting, a Connection object will be instantiated. This instance becomes the parent - # of all the channel subscriptions that are created from there on. Incoming messages are then routed to these channel subscriptions - # based on an identifier sent by the cable consumer. The Connection itself does not deal with any specific application logic beyond + # For every WebSocket the Action Cable server accepts, a Connection object will be instantiated. This instance becomes the parent + # of all of the channel subscriptions that are created from there on. Incoming messages are then routed to these channel subscriptions + # based on an identifier sent by the Action Cable consumer. The Connection itself does not deal with any specific application logic beyond # authentication and authorization. # # Here's a basic example: @@ -33,9 +33,9 @@ module ActionCable # end # end # - # First, we declare that this connection can be identified by its current_user. This allows us later to be able to find all connections - # established for that current_user (and potentially disconnect them if the user was removed from an account). You can declare as many - # identification indexes as you like. Declaring an identification means that a attr_accessor is automatically set for that key. + # First, we declare that this connection can be identified by its current_user. This allows us to later be able to find all connections + # established for that current_user (and potentially disconnect them). You can declare as many + # identification indexes as you like. Declaring an identification means that an attr_accessor is automatically set for that key. # # Second, we rely on the fact that the WebSocket connection is established with the cookies from the domain being sent along. This makes # it easy to use signed cookies that were set when logging in via a web interface to authorize the WebSocket connection. @@ -48,12 +48,13 @@ module ActionCable include InternalChannel include Authorization - attr_reader :server, :env, :subscriptions, :logger - delegate :stream_event_loop, :worker_pool, :pubsub, to: :server + attr_reader :server, :env, :subscriptions, :logger, :worker_pool + delegate :stream_event_loop, :pubsub, to: :server def initialize(server, env) @server, @env = server, env + @worker_pool = server.worker_pool @logger = new_tagged_logger @websocket = ActionCable::Connection::WebSocket.new(env, self, stream_event_loop) @@ -65,8 +66,8 @@ module ActionCable end # Called by the server when a new WebSocket connection is established. This configures the callbacks intended for overwriting by the user. - # This method should not be called directly. Rely on the #connect (and #disconnect) callback instead. - def process + # This method should not be called directly -- instead rely upon on the #connect (and #disconnect) callbacks. + def process # :nodoc: logger.info started_request_message if websocket.possible? && allow_request_origin? @@ -76,7 +77,7 @@ module ActionCable end end - # Data received over the cable is handled by this method. It's expected that everything inbound is JSON encoded. + # Data received over the WebSocket connection is handled by this method. It's expected that everything inbound is JSON encoded. # The data is routed to the proper channel that the connection has subscribed to. def receive(data_in_json) if websocket.alive? @@ -88,7 +89,7 @@ module ActionCable # Send raw data straight back down the WebSocket. This is not intended to be called directly. Use the #transmit available on the # Channel instead, as that'll automatically address the correct subscriber and wrap the message in JSON. - def transmit(data) + def transmit(data) # :nodoc: websocket.transmit data end @@ -154,7 +155,7 @@ module ActionCable def handle_open connect if respond_to?(:connect) subscribe_to_internal_channel - beat + confirm_connection_monitor_subscription message_buffer.process! server.add_connection(self) @@ -173,6 +174,13 @@ module ActionCable disconnect if respond_to?(:disconnect) end + def confirm_connection_monitor_subscription + # Send confirmation message to the internal connection monitor channel. + # This ensures the connection monitor state is reset after a successful + # websocket connection. + transmit ActiveSupport::JSON.encode(identifier: ActionCable::INTERNAL[:identifiers][:ping], type: ActionCable::INTERNAL[:message_types][:confirmation]) + end + def allow_request_origin? return true if server.config.disable_request_forgery_protection @@ -185,12 +193,14 @@ module ActionCable end def respond_to_successful_request + logger.info successful_request_message websocket.rack_response end def respond_to_invalid_request close if websocket.alive? + logger.error invalid_request_message logger.info finished_request_message [ 404, { 'Content-Type' => 'text/plain' }, [ 'Page not found' ] ] end @@ -205,7 +215,7 @@ module ActionCable 'Started %s "%s"%s for %s at %s' % [ request.request_method, request.filtered_path, - websocket.possible? ? ' [WebSocket]' : '', + websocket.possible? ? ' [WebSocket]' : '[non-WebSocket]', request.ip, Time.now.to_s ] end @@ -213,10 +223,22 @@ module ActionCable def finished_request_message 'Finished "%s"%s for %s at %s' % [ request.filtered_path, - websocket.possible? ? ' [WebSocket]' : '', + websocket.possible? ? ' [WebSocket]' : '[non-WebSocket]', request.ip, Time.now.to_s ] end + + def invalid_request_message + 'Failed to upgrade to WebSocket (REQUEST_METHOD: %s, HTTP_CONNECTION: %s, HTTP_UPGRADE: %s)' % [ + env["REQUEST_METHOD"], env["HTTP_CONNECTION"], env["HTTP_UPGRADE"] + ] + end + + def successful_request_message + 'Successfully upgraded to WebSocket (REQUEST_METHOD: %s, HTTP_CONNECTION: %s, HTTP_UPGRADE: %s)' % [ + env["REQUEST_METHOD"], env["HTTP_CONNECTION"], env["HTTP_UPGRADE"] + ] + end end end end diff --git a/actioncable/lib/action_cable/connection/client_socket.rb b/actioncable/lib/action_cable/connection/client_socket.rb index ef937d7c16..f6b11e93f0 100644 --- a/actioncable/lib/action_cable/connection/client_socket.rb +++ b/actioncable/lib/action_cable/connection/client_socket.rb @@ -50,14 +50,16 @@ module ActionCable @driver.on(:error) { |e| emit_error(e.message) } @stream = ActionCable::Connection::Stream.new(@stream_event_loop, self) + end + + def start_driver + return if @driver.nil? || @driver_started + @stream.hijack_rack_socket if callback = @env['async.callback'] callback.call([101, {}, @stream]) end - end - def start_driver - return if @driver.nil? || @driver_started @driver_started = true @driver.start end @@ -132,11 +134,8 @@ module ActionCable @ready_state = CLOSING @close_params = [reason, code] - if @stream - @stream.shutdown - else - finalize_close - end + @stream.shutdown if @stream + finalize_close end def finalize_close diff --git a/actioncable/lib/action_cable/connection/identification.rb b/actioncable/lib/action_cable/connection/identification.rb index 885ff3f102..4a54044aff 100644 --- a/actioncable/lib/action_cable/connection/identification.rb +++ b/actioncable/lib/action_cable/connection/identification.rb @@ -12,7 +12,7 @@ module ActionCable class_methods do # Mark a key as being a connection identifier index that can then be used to find the specific connection again later. - # Common identifiers are current_user and current_account, but could be anything really. + # Common identifiers are current_user and current_account, but could be anything, really. # # Note that anything marked as an identifier will automatically create a delegate by the same name on any # channel instances created off the connection. diff --git a/actioncable/lib/action_cable/connection/message_buffer.rb b/actioncable/lib/action_cable/connection/message_buffer.rb index 2f65a1e84a..19f2e6e918 100644 --- a/actioncable/lib/action_cable/connection/message_buffer.rb +++ b/actioncable/lib/action_cable/connection/message_buffer.rb @@ -1,8 +1,7 @@ module ActionCable module Connection - # Allows us to buffer messages received from the WebSocket before the Connection has been fully initialized and is ready to receive them. - # Entirely internal operation and should not be used directly by the user. - class MessageBuffer + # Allows us to buffer messages received from the WebSocket before the Connection has been fully initialized, and is ready to receive them. + class MessageBuffer # :nodoc: def initialize(connection) @connection = connection @buffered_messages = [] diff --git a/actioncable/lib/action_cable/connection/stream.rb b/actioncable/lib/action_cable/connection/stream.rb index ace250cd16..2d97b28c09 100644 --- a/actioncable/lib/action_cable/connection/stream.rb +++ b/actioncable/lib/action_cable/connection/stream.rb @@ -4,15 +4,13 @@ module ActionCable # This class is heavily based on faye-websocket-ruby # # Copyright (c) 2010-2015 James Coglan - class Stream + class Stream # :nodoc: def initialize(event_loop, socket) @event_loop = event_loop @socket_object = socket @stream_send = socket.env['stream.send'] @rack_hijack_io = nil - - hijack_rack_socket end def each(&callback) @@ -39,16 +37,16 @@ module ActionCable @socket_object.parse(data) end - private - def hijack_rack_socket - return unless @socket_object.env['rack.hijack'] + def hijack_rack_socket + return unless @socket_object.env['rack.hijack'] - @socket_object.env['rack.hijack'].call - @rack_hijack_io = @socket_object.env['rack.hijack_io'] + @socket_object.env['rack.hijack'].call + @rack_hijack_io = @socket_object.env['rack.hijack_io'] - @event_loop.attach(@rack_hijack_io, self) - end + @event_loop.attach(@rack_hijack_io, self) + end + private def clean_rack_hijack return unless @rack_hijack_io @event_loop.detach(@rack_hijack_io, self) diff --git a/actioncable/lib/action_cable/connection/subscriptions.rb b/actioncable/lib/action_cable/connection/subscriptions.rb index d7f95e6a62..3742f248d1 100644 --- a/actioncable/lib/action_cable/connection/subscriptions.rb +++ b/actioncable/lib/action_cable/connection/subscriptions.rb @@ -3,8 +3,8 @@ require 'active_support/core_ext/hash/indifferent_access' module ActionCable module Connection # Collection class for all the channel subscriptions established on a given connection. Responsible for routing incoming commands that arrive on - # the connection to the proper channel. Should not be used directly by the user. - class Subscriptions + # the connection to the proper channel. + class Subscriptions # :nodoc: def initialize(connection) @connection = connection @subscriptions = {} @@ -54,7 +54,7 @@ module ActionCable end def unsubscribe_from_all - subscriptions.each { |id, channel| channel.unsubscribe_from_channel } + subscriptions.each { |id, channel| remove_subscription(channel) } end protected |